import {
  Tagged,
  asyncIter,
  asyncMap,
  enforce,
  enforce_nonnull,
  path_index,
  reset_to_defaults,
  same,
} from "vscript";
import * as VAPI from "../artefacts/vapi/index.js";
import {
  BackwardsCompatibleInterfaceSpec,
  InterfaceSelector,
  collect_rtp_interfaces,
  extract_port_indices,
  getHostingPort,
  resolve_interfaces_backwards_compatibly,
  splitIPv4Address,
} from "./network.js";
import { random_choice, random_number } from "./random.js";
import {
  ResourceCount,
  audio_ref,
  delete_row,
  find_vm_and_count,
  hosting_vm,
  video_ref,
} from "./utils.js";
import { is_higher_than_3G } from "./video.js";

export type DstAddressFn = (pars: {
  src: string; // ip address
  mediaType: "audio" | "video" | "meta" | "passthrough" | "mipmap";
  transmitterIndex: number;
  mid: "primary" | "secondary";
}) => string;

const set_check_ref_ip_config = async (
  ref_ip_config: VAPI.RTPTransmitter.IpConfig | null,
  mediaType: "audio" | "video" | "meta",
  mid: "primary" | "secondary",
  mode: "set" | "check",
  media_stream_index: number,
  dst_address_fn: DstAddressFn,
) => {
  if (ref_ip_config) {
    const src = (await ref_ip_config.src_address_vlan.status.read())
      .src_address;
    if (src) {
      const dstShouldBe = dst_address_fn({
        mid,
        src,
        mediaType,
        transmitterIndex: media_stream_index,
      });
      if (mode === "set") {
        await ref_ip_config.dst_address.command.write(dstShouldBe);
      } else {
        const dstIs = await ref_ip_config.dst_address.status.read();
        if (dstIs && dstShouldBe !== dstIs) {
          throw new Error(
            `${ref_ip_config.raw.kwl}.dst_address.status should be ${dstShouldBe}, but is ${dstIs}`,
          );
        }
      }
    } else await reset_to_defaults(ref_ip_config);
  }
};

export async function canonicalizeDstAddresses(
  media_stream:
    | VAPI.Any.RTPTransmitter.MediaStreamer
    | VAPI.Any.RTPTransmitter.MediaStreamerVideo,
  mode: "set" | "check",
  media_stream_index: number,
  dst_address_fn: DstAddressFn,
) {
  if (
    media_stream instanceof VAPI.AT1101.RTPTransmitter.MediaStreamer ||
    media_stream instanceof VAPI.AT1130.RTPTransmitter.MediaStreamer
  ) {
    await Promise.all([
      set_check_ref_ip_config(
        media_stream.ip_configuration.media.primary,
        "audio",
        "primary",
        mode,
        media_stream_index,
        dst_address_fn,
      ),
      set_check_ref_ip_config(
        media_stream.ip_configuration.media.secondary,
        "audio",
        "secondary",
        mode,
        media_stream_index,
        dst_address_fn,
      ),
    ]);
  } else {
    await Promise.all([
      set_check_ref_ip_config(
        media_stream.ip_configuration.video.primary,
        "video",
        "primary",
        mode,
        media_stream_index,
        dst_address_fn,
      ),
      set_check_ref_ip_config(
        media_stream.ip_configuration.video.secondary,
        "video",
        "secondary",
        mode,
        media_stream_index,
        dst_address_fn,
      ),
      set_check_ref_ip_config(
        media_stream.ip_configuration.meta.primary,
        "meta",
        "primary",
        mode,
        media_stream_index,
        dst_address_fn,
      ),
      set_check_ref_ip_config(
        media_stream.ip_configuration.meta.secondary,
        "meta",
        "secondary",
        mode,
        media_stream_index,
        dst_address_fn,
      ),
    ]);
  }
}

export async function force_activate(
  tx: VAPI.Any.RTPTransmitter.Session,
): Promise<string> {
  await tx.active.command.write(false);
  const weak_uuid = Math.random().toString(36).replace("0.", "");
  await tx.session_name.brief.command.write(weak_uuid);
  await tx.active.command.write(true);
  // TODO: not sure when it uses sdp_a vs sdp_b?
  return await tx.sdp_a.wait_until((sdp: string) => sdp.includes(weak_uuid));
}

// TODO: extend to other streamer types
export type AnyStreamer =
  | VAPI.Any.RTPTransmitter.AudioStreamer
  | VAPI.Any.RTPTransmitter.VideoStreamer;

// NOTE: will throw if hosting_session is null!
export async function get_hosting_session(
  streamer: AnyStreamer,
): Promise<VAPI.Any.RTPTransmitter.Session> {
  return enforce_nonnull(await streamer.generic.hosting_session.status.read());
}

export async function deactivate_hosting_session(
  streamer: AnyStreamer,
): Promise<void> {
  await (await get_hosting_session(streamer)).active.command.write(false);
}

export async function force_activate_hosting_session(
  streamer: AnyStreamer,
): Promise<string> {
  return await force_activate(await get_hosting_session(streamer));
}

export async function set_transport_format(
  tf: VAPI.Video.TransportFormat,
  vtf: VAPI.Any.RTPTransmitter.VideoStreamer,
  meta?: boolean,
) {
  switch (tf) {
    default:
    case "ST2022_6":
      await vtf.configuration.transport_format.command.write({
        variant: "ST2022_6",
        value: {},
      });
      break;
    case "ST2042_raw":
      await vtf.configuration.transport_format.command.write({
        variant: "ST2042",
        value: { add_st2110_40: false, compression: "C_2_5" },
      });
      break;
    case "ST2110_BPM":
      await vtf.configuration.transport_format.command.write({
        variant: "ST2110_20",
        value: {
          add_st2110_40: meta ? meta : false,
          packing_mode: "BPM",
          transmit_scheduler_uhd: false,
        },
      });
      break;
    case "ST2110_GPM":
      await vtf.configuration.transport_format.command.write({
        variant: "ST2110_20",
        value: {
          add_st2110_40: meta ? meta : false,
          packing_mode: "GPM",
          transmit_scheduler_uhd: false,
        },
      });
      break;
    case "ST2110_22_JpegXS":
      await vtf.configuration.transport_format.command.write({
        variant: "JPEG_XS",
        value: {
          add_st2110_40: false,
          compression: { variant: "Ratio", value: { ratio: 5 } },
          lvl_weight_mode: "peak_signal_noise_ratio",
          omit_mandatory_pre_header: false,
        },
      });
      break;
  }
}

export async function get_packets_per_second(
  tx:
    | VAPI.Any.RTPTransmitter.VideoStreamer
    | VAPI.Any.RTPTransmitter.AudioStreamer,
) {
  const video_hardware = async (
    vtx: VAPI.Any.RTPTransmitter.VideoStreamer,
  ): Promise<VAPI.EthernetStats.Counter> => {
    for (const wrapped of await vtx.connected_hardware.router_encoder.rows()) {
      const router = await wrapped.wrapped_reference.read();
      if (router) {
        if (router instanceof VAPI.AT1130.RTPTransmitter.HDRVideo) {
          const c = await router.related_references.tx_stream_counter.read();
          if (c) {
            return await c.read();
          }
        } else {
          const c = await router.related_references.tx_stream_counter.read();
          if (c) {
            return await c.read();
          }
        }
      }
    }
    return {
      bytes_per_sec: 0,
      bytes_total: 0,
      packets_per_sec: 0,
      packets_total: 0,
    };
  };
  const audio_hardware = async (
    tx:
      | VAPI.AT1101.RTPTransmitter.AudioStreamer
      | VAPI.AT1130.RTPTransmitter.AudioStreamer,
  ): Promise<VAPI.EthernetStats.Counter> => {
    const en =
      tx instanceof VAPI.AT1130.RTPTransmitter.AudioStreamer
        ? await tx.connected_hardware.audio_encoder.wrapped_reference_encoder.read()
        : await tx.connected_hardware.audio_encoder.primary.wrapped_reference_encoder.read();
    if (en instanceof VAPI.AT1130.RTPTransmitter.AudioEncoder) {
      const ref = await en.flows.primary.hdr_ref.read();
      if (ref) {
        const hdr = await ref.related_references.tx_stream_counter.read();
        if (hdr) {
          const c = await hdr.read();
          if (c) {
            return c;
          }
        }
      }
    } else if (en instanceof VAPI.AT1101.RTPTransmitter.HDRAudio) {
      const c = await en.related_references.tx_stream_counter.read();
      if (c) {
        return await c.read();
      }
    }
    return {
      bytes_per_sec: 0,
      bytes_total: 0,
      packets_per_sec: 0,
      packets_total: 0,
    };
  };

  if (
    tx instanceof VAPI.AT1101.RTPTransmitter.VideoStreamer ||
    tx instanceof VAPI.AT1130.RTPTransmitter.VideoStreamer
  )
    return await video_hardware(tx);

  return await audio_hardware(tx);
}

// FIXME: should autoderive this from VideoFormat (not sure how)
export type PartialVideoFormat =
  | {
      variant: "ST2022_6";
      value: Partial<VAPI.RTPTransmitter.ST20226Settings>;
    }
  | {
      variant: "ST2110_20";
      value:
        | Partial<VAPI.AT1101.RTPTransmitter.ST2110Settings>
        | Partial<VAPI.AT1130.RTPTransmitter.ST2110Settings>;
    }
  | {
      variant: "ST2042";
      value: Partial<VAPI.RTPTransmitter.ST2042Settings>;
    }
  | {
      variant: "JPEG_XS";
      value: Partial<VAPI.RTPTransmitter.JpegXsSettings>;
    };

export type PartialVideoRequirements = {
  add_st2110_40?: boolean;
  transmit_scheduler_uhd?: boolean;
};

export function audio_bandwidth_estimate(pars: {
  transport_format: VAPI.RTPTransmitter.AudioConfigurationFormat;
}): {
  bits_per_sec: number;
} {
  const { num_channels, packet_time, format } = pars.transport_format;
  const pps = () => {
    switch (packet_time) {
      default:
      case "p1":
        return 1000;
      case "p0_666":
        return 1500;
      case "p0_500":
        return 2000;
      case "p0_333":
        return 3000;
      case "p0_250":
        return 4000;
      case "p0_125":
        return 8000;
    }
  };

  const bps = () => {
    switch (format) {
      default:
      case "L16":
        return 2;
      case "L24":
        return 3;
      case "AM824":
        return 4;
    }
  };
  return { bits_per_sec: (num_channels * bps() * 48000 + pps() * 64) * 8 };
}

export function bandwidth_estimate(pars: {
  std?: VAPI.Video.Standard;
  transport_format: PartialVideoFormat | VAPI.Any.RTPTransmitter.VideoFormat;
}): { bits_per_sec: number } {
  // very rough approximation
  let bits_per_sec = pars.std && is_higher_than_3G(pars.std) ? 12e9 : 3e9;
  switch (pars.transport_format.variant) {
    case "ST2022_6":
    case "ST2110_20":
      break;
    case "JPEG_XS":
      {
        const comp = pars.transport_format.value
          ?.compression ?? /* pessimistic worst-case estimate */ {
          variant: "Ratio",
          value: { ratio: 5 },
        };
        switch (comp.variant) {
          case "Bandwidth":
            bits_per_sec = comp.value.max_data_rate;
            break;
          case "Ratio":
            bits_per_sec /= comp.value.ratio;
        }
      }
      break;
    case "ST2042":
      switch (
        pars.transport_format.value?.compression ??
        /* pessimistic worst-case estimate */ "C_2_5"
      ) {
        case "C_2_5":
          bits_per_sec /= 2.5;
          break;
        case "C_3_33":
          bits_per_sec /= 3.33;
          break;
        case "C_4":
          bits_per_sec /= 4;
          break;
        case "C_4_44":
          bits_per_sec /= 4.44;
          break;
        case "C_6_66":
          bits_per_sec /= 6.66;
          break;
        case "C_8":
          bits_per_sec /= 8;
          break;
      }
  }
  return { bits_per_sec };
}

export async function reserved_bandwidth(
  tx:
    | VAPI.Any.RTPTransmitter.VideoStreamer
    | VAPI.Any.RTPTransmitter.AudioStreamer,
): Promise<{
  bits_per_sec: number;
  ports: Array<VAPI.Any.NetworkInterfaces.Port>;
}> {
  const session = await tx.generic.hosting_session.status.read();
  if (!session) return { bits_per_sec: 0, ports: [] };
  const ports = await (async () => {
    const result: Array<VAPI.Any.NetworkInterfaces.Port> = [];
    const ifcs = await session.interfaces.status
      .read()
      .then((ifcs) => [ifcs.primary, ifcs.secondary]);
    for (const ifc of ifcs) {
      if (!ifc) continue;
      const port = await getHostingPort(ifc);
      if (result.find((other_port) => same(port, other_port)) === undefined)
        result.push(port);
    }
    return result;
  })();

  if (
    tx instanceof VAPI.AT1130.RTPTransmitter.VideoStreamer ||
    tx instanceof VAPI.AT1101.RTPTransmitter.VideoStreamer
  ) {
    return {
      ports,
      ...bandwidth_estimate({
        std:
          (await tx.constraints.standard.status.read()) ??
          ((await tx.constraints.max_bandwidth.status.read()) === "b12_0Gb"
            ? "HD2160p60"
            : "HD1080p60"),
        transport_format: await tx.configuration.transport_format.status.read(),
      }),
    };
  }
  if (
    tx instanceof VAPI.AT1130.RTPTransmitter.AudioStreamer ||
    tx instanceof VAPI.AT1101.RTPTransmitter.AudioStreamer
  ) {
    return {
      ports,
      ...audio_bandwidth_estimate({
        transport_format: await tx.configuration.transport_format.status.read(),
      }),
    };
  }
  return { ports: [], bits_per_sec: 0 };
}

function collect_standards(
  standard?: VAPI.Video.Standard | VAPI.Video.Standard[],
): VAPI.Video.Standard[] {
  return !!standard && typeof standard === "string"
    ? [standard]
    : standard ?? VAPI.Video.Enums.Standard;
}

type PortIndex = Tagged<number, "port-index">;

type BandwidthPerPort = Map<PortIndex, { bits_per_sec: number }>;

async function compute_blocked_bandwidth(
  vm: VAPI.VM.Any,
): Promise<{ audio: BandwidthPerPort; video: BandwidthPerPort }> {
  const blocked_bandwidth_audio: BandwidthPerPort = new Map();
  const blocked_bandwidth_video: BandwidthPerPort = new Map();
  const rtp_transmitter = vm.r_t_p_transmitter;
  if (rtp_transmitter) {
    await asyncIter(
      await rtp_transmitter.audio_transmitters.rows(),
      async (tx) => {
        const { bits_per_sec, ports } = await reserved_bandwidth(tx);
        for (const port of ports) {
          const port_index = enforce_nonnull(path_index(port.raw.kwl));
          blocked_bandwidth_audio.set(port_index, {
            bits_per_sec:
              bits_per_sec +
              (blocked_bandwidth_audio.get(port_index) ?? { bits_per_sec: 0 })
                .bits_per_sec,
          });
        }
      },
    );
    await asyncIter(
      await rtp_transmitter.video_transmitters.rows(),
      async (tx) => {
        const { bits_per_sec, ports } = await reserved_bandwidth(tx);
        for (const port of ports) {
          const port_index = enforce_nonnull(path_index(port.raw.kwl));
          blocked_bandwidth_video.set(port_index, {
            bits_per_sec:
              bits_per_sec +
              (blocked_bandwidth_video.get(port_index) ?? { bits_per_sec: 0 })
                .bits_per_sec,
          });
        }
      },
    );
  }
  return { audio: blocked_bandwidth_audio, video: blocked_bandwidth_video };
}

export async function max_video_transmitters(
  pars: {
    vm: VAPI.VM.Any;
    transport_format: VAPI.Any.RTPTransmitter.VideoFormat | PartialVideoFormat;
    preexisting_transmitters: "bulldoze" | "preserve";
    log: (msg: string) => void;
    standard?: VAPI.Video.Standard | VAPI.Video.Standard[];
    bandwidth_constraints?: "obey" /* the default */ | "ignore";
  } & BackwardsCompatibleInterfaceSpec,
) {
  if (!pars.vm.r_t_p_transmitter) return 0;
  const rtp_transmitter = pars.vm.r_t_p_transmitter;
  const standards = collect_standards(pars.standard);
  const uhd_enabled = !!standards.find((std) => is_higher_than_3G(std));
  const max_num_by_bandwidth = async () => {
    if ((pars.bandwidth_constraints ?? "obey") === "ignore") return Infinity;
    const bandwidth_left: BandwidthPerPort = new Map();
    for (const port_index of await pars.vm.network_interfaces.ports.allocated_indices()) {
      const port = pars.vm.network_interfaces.ports.row(port_index);
      if (!(await port.supports_rtp.read())) continue;
      enforce(!bandwidth_left.has(port_index));
      bandwidth_left.set(port_index, {
        bits_per_sec:
          enforce_nonnull(
            await port.max_throughput.wait_until(
              (bandwidth) => bandwidth !== null,
            ),
          ) *
          (1.0 -
            0.01 *
              (await enforce_nonnull(
                rtp_transmitter,
              ).settings.reserved_bandwidth.read())),
      });
    }
    const blocked_bandwidth = await compute_blocked_bandwidth(pars.vm);
    for (const d of [
      blocked_bandwidth.audio,
      ...(pars.preexisting_transmitters === "preserve"
        ? [blocked_bandwidth.video]
        : []),
    ]) {
      for (const [port_index, { bits_per_sec }] of d) {
        bandwidth_left.set(port_index, {
          bits_per_sec:
            enforce_nonnull(bandwidth_left.get(port_index)).bits_per_sec -
            bits_per_sec,
        });
      }
    }
    let bits_per_sec_per_tx = Infinity;
    for (const std of standards) {
      bits_per_sec_per_tx = Math.min(
        bits_per_sec_per_tx,
        bandwidth_estimate({ std, transport_format: pars.transport_format })
          .bits_per_sec,
      );
    }
    let max_num = 0;
    while (true) {
      for (const port_index of extract_port_indices(pars, max_num)) {
        const entry = enforce_nonnull(bandwidth_left.get(port_index));
        if (entry.bits_per_sec < bits_per_sec_per_tx) return max_num;
        ++max_num;
        bandwidth_left.set(port_index, {
          bits_per_sec: entry.bits_per_sec - bits_per_sec_per_tx,
        });
      }
    }
  };
  const base_estimate = (() => {
    switch (pars.transport_format.variant) {
      case "JPEG_XS":
        return rtp_transmitter.runtime_constants.num_jpegxs;
      case "ST2022_6":
        return rtp_transmitter.runtime_constants.num_videotransmitters;
      case "ST2042":
        return uhd_enabled
          ? rtp_transmitter.runtime_constants.max_2042_uhd
          : rtp_transmitter.runtime_constants.max_2042;
      case "ST2110_20":
        return uhd_enabled
          ? rtp_transmitter.runtime_constants.max_st2110_20_uhd_transmitters
          : rtp_transmitter.runtime_constants.num_videotransmitters;
    }
  })();
  const blocked_transmitters = await (async () => {
    // TODO @ Clemens: please review
    if (pars.preexisting_transmitters === "bulldoze") return 0;
    const txs = await rtp_transmitter.video_transmitters.rows();
    // TODO @ Clemens: this doesn't distinguish between UHD & non-UHD variants
    return (
      await asyncMap(txs, async (tx) =>
        (await tx.configuration.transport_format.status.read()).variant ===
        pars.transport_format.variant
          ? 1
          : 0,
      )
    ).reduce((acc: number, n) => acc + n, 0);
  })();
  const leftover = base_estimate - blocked_transmitters;
  return leftover > 0
    ? Math.min(leftover, await max_num_by_bandwidth())
    : leftover;
}

export async function max_audio_transmitters(
  pars: {
    vm: VAPI.VM.Any;
    transport_format: VAPI.RTPTransmitter.AudioConfigurationFormat;
    preexisting_transmitters: "bulldoze" | "preserve";
    log: (msg: string) => void;
    bandwidth_constraints?: "obey" /* the default */ | "ignore";
  } & BackwardsCompatibleInterfaceSpec,
) {
  if (!pars.vm.r_t_p_transmitter) return 0;
  const rtp_transmitter = pars.vm.r_t_p_transmitter;

  // FIXME: max_audio_transmitters should account for bandwidth blocked by video transmitters,
  // and vice versa
  const max_num_by_bandwidth = async () => {
    if ((pars.bandwidth_constraints ?? "obey") === "ignore") return Infinity;
    const bandwidth_left: BandwidthPerPort = new Map();
    for (const port_index of await pars.vm.network_interfaces.ports.allocated_indices()) {
      const port = pars.vm.network_interfaces.ports.row(port_index);
      if (!(await port.supports_rtp.read())) continue;
      enforce(!bandwidth_left.has(port_index));
      bandwidth_left.set(port_index, {
        bits_per_sec:
          enforce_nonnull(
            await port.max_throughput.wait_until(
              (bandwidth) => bandwidth !== null,
            ),
          ) *
          (1.0 -
            0.01 *
              (await enforce_nonnull(
                rtp_transmitter,
              ).settings.reserved_bandwidth.read())),
      });
    }
    const blocked_bandwidth = await compute_blocked_bandwidth(pars.vm);
    for (const d of [
      blocked_bandwidth.video,
      ...(pars.preexisting_transmitters === "preserve"
        ? [blocked_bandwidth.audio]
        : []),
    ]) {
      for (const [port_index, { bits_per_sec }] of d) {
        bandwidth_left.set(port_index, {
          bits_per_sec:
            enforce_nonnull(bandwidth_left.get(port_index)).bits_per_sec -
            bits_per_sec,
        });
      }
    }
    let bits_per_sec_per_tx = audio_bandwidth_estimate({
      transport_format: pars.transport_format,
    }).bits_per_sec;
    let max_num = 0;
    while (true) {
      for (const port_index of extract_port_indices(pars, max_num)) {
        const entry = enforce_nonnull(bandwidth_left.get(port_index));
        if (entry.bits_per_sec < bits_per_sec_per_tx) return max_num;
        ++max_num;
        bandwidth_left.set(port_index, {
          bits_per_sec: entry.bits_per_sec - bits_per_sec_per_tx,
        });
      }
    }
  };

  const base_estimate = rtp_transmitter.runtime_constants.num_audiotransmitters;
  return base_estimate > 0
    ? Math.min(base_estimate, await max_num_by_bandwidth())
    : base_estimate;
}

function to_full_video_format(pars: {
  transport_format: PartialVideoFormat | VAPI.Any.RTPTransmitter.VideoFormat;
  requirements?: PartialVideoRequirements;
}): VAPI.AT1130.RTPTransmitter.VideoFormat {
  switch (pars.transport_format.variant) {
    case "ST2022_6":
      return pars.transport_format;
    case "JPEG_XS":
      return {
        variant: "JPEG_XS",
        value: {
          add_st2110_40: pars.requirements?.add_st2110_40 ?? false,
          compression: {
            variant: "Ratio",
            value: { ratio: random_number(5, 40) },
          },
          lvl_weight_mode: random_choice([
            "visual_optimization",
            "peak_signal_noise_ratio",
          ]),
          omit_mandatory_pre_header: false,
          ...pars.transport_format.value,
        },
      };
    case "ST2042":
      return {
        variant: "ST2042",
        value: {
          add_st2110_40: pars.requirements?.add_st2110_40 ?? false,
          compression: random_choice(VAPI.Video.Enums.DiracCompression),
          ...pars.transport_format.value,
        },
      };
    case "ST2110_20":
      return {
        variant: "ST2110_20",
        value: {
          add_st2110_40: pars.requirements?.add_st2110_40 ?? false,
          packing_mode: random_choice(["BPM", "GPM"]),
          transmit_scheduler_uhd:
            pars.requirements?.transmit_scheduler_uhd ?? false,
          ...pars.transport_format.value,
        },
      };
  }
}

function to_full_audio_format(
  transport_format: Partial<VAPI.RTPTransmitter.AudioConfigurationFormat>,
): VAPI.RTPTransmitter.AudioConfigurationFormat {
  return {
    format: "L16",
    num_channels: 8,
    packet_time: "p0_125",
    ...transport_format,
  };
}

export function to_partial_config(
  tf:
    | VAPI.Video.TransportFormat
    | VAPI.Any.RTPTransmitter.VideoFormat
    | PartialVideoFormat,
): PartialVideoFormat {
  if (typeof tf === "string") {
    switch (tf) {
      case "ST2022_6":
        return { variant: "ST2022_6", value: {} };
      case "ST2042_raw":
        return { variant: "ST2042", value: {} };
      case "ST2110_22_JpegXS":
        return { variant: "JPEG_XS", value: {} };
      case "ST2110_BPM":
        return { variant: "ST2110_20", value: { packing_mode: "BPM" } };
      case "ST2110_GPM":
        return { variant: "ST2110_20", value: { packing_mode: "GPM" } };
    }
  }
  return tf;
}

// NOTE: we

type CommonSetupOptions = {
  count: ResourceCount;
  // will be tried in the specified order, though maximization will overrule vm order
  // in case of count === MAX
  vms: VAPI.VM.Any[];
  preexisting_transmitters: "bulldoze" | "preserve";
  log: (msg: string) => void;
  address_schema: DstAddressFn;
  bandwidth_constraints?: "obey" /* the default */ | "ignore";
  verbose?: boolean;
} & BackwardsCompatibleInterfaceSpec;

export async function setup_video_transmitters(
  pars: CommonSetupOptions & {
    transport_format:
      | VAPI.Video.TransportFormat
      | VAPI.Any.RTPTransmitter.VideoFormat
      | PartialVideoFormat;
    requirements?: PartialVideoRequirements;
    standard?: VAPI.Video.Standard | VAPI.Video.Standard[];
    pattern?: VAPI.VideoSignalGenerator.TestPatternSelect;
  },
): Promise<
  | VAPI.AT1101.RTPTransmitter.VideoStreamerAsNamedTableRow[]
  | VAPI.AT1130.RTPTransmitter.VideoStreamerAsNamedTableRow[]
> {
  const partial_tf = to_partial_config(pars.transport_format);
  const [vm, count] = await find_vm_and_count(
    pars.vms,
    async (vm) =>
      await max_video_transmitters({
        ...pars,
        transport_format: partial_tf,
        vm,
      }),
    pars.count,
    {
      resource: "video transmitter",
      specs: JSON.stringify({
        transport_format: pars.transport_format,
        standard: pars.standard,
        bandwidth_constraints: pars.bandwidth_constraints,
        preexisting_transmitters: pars.preexisting_transmitters,
        pattern: pars.pattern,
      }),
    },
  );
  if (count === 0) return [];
  enforce(!!vm && !!vm.r_t_p_transmitter);
  const tf = to_full_video_format({
    transport_format: partial_tf,
    requirements: pars.requirements,
  });
  if (pars.preexisting_transmitters === "bulldoze") {
    await asyncIter(
      await vm.r_t_p_transmitter.video_transmitters.rows(),
      (tx) => abandon({ tx, delete_session_if_unused: true }),
    );
  }
  const standards = collect_standards(pars.standard);
  const available_vsgs = await (async () => {
    // FIXME @ Clemens: check pattern parameter
    const all_instances = [
      ...enforce_nonnull(vm.video_signal_generator).instances,
    ];
    if (pars.preexisting_transmitters === "bulldoze") {
      return all_instances;
    }
    const blocked_instances: Array<VAPI.Any.VideoSignalGenerator.VSG> = [];
    await asyncIter(
      await enforce_nonnull(vm.r_t_p_transmitter).video_transmitters.rows(),
      async (tx) => {
        const v_src = (await tx.v_src.status.read()).source;
        if (!v_src) return;
        for (const instance of all_instances) {
          // if standards = [instance's std] && pattern == [instance's pattern], that instance won't
          // count as blocked as we're going to use the same signal anyway
          if (
            same(instance.output, v_src) &&
            !(
              (standards.length === 1 &&
                standards[0] === (await instance.standard.status.read()) &&
                !pars.pattern) ||
              pars.pattern === (await instance.pattern.read())
            )
          ) {
            if (!blocked_instances.find((i) => same(i, instance))) {
              blocked_instances.push(instance);
            }
            break;
          }
        }
      },
    );
    return all_instances.filter(
      (vsg) => !blocked_instances.find((other_vsg) => same(vsg, other_vsg)),
    );
  })();
  const transmitters = await (async (): Promise<
    VAPI.Any.RTPTransmitter.VideoStreamer[]
  > => {
    switch (pars.preexisting_transmitters) {
      case "bulldoze":
        return await enforce_nonnull(
          vm.r_t_p_transmitter,
        ).video_transmitters.ensure_allocated(count, "exactly");
      case "preserve": {
        const result: Array<VAPI.Any.RTPTransmitter.VideoStreamer> = [];
        while (result.length < count) {
          result.push(
            enforce_nonnull(
              await vm.r_t_p_transmitter?.video_transmitters.create_row(),
            ),
          );
        }
        return result;
      }
    }
  })();
  // FIXME @ Clemens: should take this into account during VM selection
  if (available_vsgs.length === 0)
    throw new Error(
      `Unable to setup video transmitters: no VSGs left at ${vm}`,
    );
  await available_vsgs[0].standard.command.write(standards[0]);
  if (pars.pattern) await available_vsgs[0].pattern.write(pars.pattern);
  await asyncIter(transmitters, async (tx, i) => {
    let hosting_session = await tx.generic.hosting_session.status.read();
    if (hosting_session) await hosting_session.active.command.write(false);

    await tx.configuration.transport_format.command.write(tf);
    await tx.constraints.standard.command.write(null);
    await tx.constraints.max_bandwidth.command.write(
      !!standards.find((std) => is_higher_than_3G(std)) ? "b12_0Gb" : "b3_0Gb",
    );

    if (!hosting_session) {
      hosting_session = enforce_nonnull(
        await vm.r_t_p_transmitter?.sessions.create_row(),
      );
      await tx.generic.hosting_session.command.write(hosting_session);
      enforce(!!hosting_session);
      await hosting_session.interfaces.command.write(
        await resolve_interfaces_backwards_compatibly(pars, vm, i),
      );
      await canonicalizeDstAddresses(
        tx.generic,
        "set",
        enforce_nonnull(path_index(tx.raw.kwl)),
        pars.address_schema,
      );
    }
    await tx.v_src.command.write(video_ref(available_vsgs[0].output));
  });
  // FIXME: get rid of typecast
  return transmitters as
    | VAPI.AT1101.RTPTransmitter.VideoStreamerAsNamedTableRow[]
    | VAPI.AT1130.RTPTransmitter.VideoStreamerAsNamedTableRow[];
}

export async function setup_audio_transmitters(
  pars: CommonSetupOptions & {
    transport_format?: Partial<VAPI.RTPTransmitter.AudioConfigurationFormat>;
  },
): Promise<
  | VAPI.AT1101.RTPTransmitter.AudioStreamerAsNamedTableRow[]
  | VAPI.AT1130.RTPTransmitter.AudioStreamerAsNamedTableRow[]
> {
  const transport_format = to_full_audio_format(pars.transport_format ?? {});
  const [vm, count] = await find_vm_and_count(
    pars.vms,
    async (vm) =>
      await max_audio_transmitters({ ...pars, transport_format, vm }),
    pars.count,
    {
      resource: "audio transmitter",
      specs: JSON.stringify({
        preexisting_transmitters: pars.preexisting_transmitters,
        bandwidth_constraints: pars.bandwidth_constraints,
        transport_format,
      }),
    },
  );
  if (count === 0) return [];
  enforce(!!vm && !!vm.r_t_p_transmitter);
  if (pars.preexisting_transmitters === "bulldoze") {
    await asyncIter(
      await vm.r_t_p_transmitter.audio_transmitters.rows(),
      (tx) => abandon({ tx, delete_session_if_unused: true }),
    );
  }
  const transmitters = await (async (): Promise<
    VAPI.Any.RTPTransmitter.AudioStreamer[]
  > => {
    switch (pars.preexisting_transmitters) {
      case "bulldoze":
        return enforce_nonnull(
          vm.r_t_p_transmitter,
        ).audio_transmitters.ensure_allocated(count, "exactly");
      case "preserve": {
        const result: Array<VAPI.Any.RTPTransmitter.AudioStreamer> = [];
        while (result.length < count) {
          result.push(
            enforce_nonnull(
              await vm.r_t_p_transmitter?.audio_transmitters.create_row(),
            ),
          );
        }
        return result;
      }
    }
  })();
  await asyncIter(transmitters, async (tx, i) => {
    await tx.configuration.transport_format.command.write(transport_format);

    let hosting_session = await tx.generic.hosting_session.status.read();
    if (hosting_session) await hosting_session.active.command.write(false);
    if (!hosting_session) {
      hosting_session = enforce_nonnull(
        await vm.r_t_p_transmitter?.sessions.create_row(),
      );
      await tx.generic.hosting_session.command.write(hosting_session);
      enforce(!!hosting_session);
      await hosting_session.interfaces.command.write(
        await resolve_interfaces_backwards_compatibly(pars, vm, i),
      );
      await canonicalizeDstAddresses(
        tx.generic,
        "set",
        enforce_nonnull(path_index(tx.raw.kwl)),
        pars.address_schema,
      );
    }
    const ASG =
      vm instanceof VAPI.AT1130.Root
        ? enforce_nonnull(vm.audio_signal_generator).genlock.row(0).f48000
            .signal_1000hz.output
        : enforce_nonnull(vm.audio_signal_generator).signal_1000hz.output;
    await tx.a_src.command.write(audio_ref(ASG));
  });
  // FIXME: get rid of typecast
  return transmitters as
    | VAPI.AT1101.RTPTransmitter.AudioStreamerAsNamedTableRow[]
    | VAPI.AT1130.RTPTransmitter.AudioStreamerAsNamedTableRow[];
}

export async function abandon(pars: {
  tx:
    | VAPI.Any.RTPTransmitter.AudioStreamer
    | VAPI.Any.RTPTransmitter.VideoStreamer;
  delete_session_if_unused: boolean;
}) {
  const session = await pars.tx.generic.hosting_session.status.read();
  if (!session) return;
  const old_active = await session.active.status.read();
  await session.active.command.write(false);
  const video_transmitters =
    (await session.video_transmitters.read()) as VAPI.Any.RTPTransmitter.VideoStreamer[];
  const audio_transmitters =
    (await session.audio_transmitters.read()) as VAPI.Any.RTPTransmitter.AudioStreamer[];
  const still_in_use_afterwards =
    !!video_transmitters.find(
      (video_tx) => !!video_tx && !same(pars.tx, video_tx),
    ) ||
    !!audio_transmitters.find(
      (audio_tx) => !!audio_tx && !same(pars.tx, audio_tx),
    );
  await pars.tx.generic.hosting_session.command.write(null);
  if (!still_in_use_afterwards) {
    if (pars.delete_session_if_unused) await delete_row(session);
  } else if (old_active) {
    await session.active.command.write(old_active);
  }
}

export function transport_defaults(
  vf: PartialVideoFormat,
): VAPI.AT1130.RTPTransmitter.VideoFormat {
  switch (vf.variant) {
    case "ST2022_6":
      return { variant: "ST2022_6", value: { ...vf.value } };
    case "JPEG_XS":
      return {
        variant: "JPEG_XS",
        value: {
          add_st2110_40: true,
          compression: { variant: "Ratio", value: { ratio: 10 } },
          lvl_weight_mode: "visual_optimization",
          omit_mandatory_pre_header: false,
          ...vf.value,
        },
      };
    case "ST2042":
      return {
        variant: "ST2042",
        value: {
          add_st2110_40: true,
          compression: "C_6_66",
          ...vf.value,
        },
      };
    case "ST2110_20":
      return {
        variant: "ST2110_20",
        value: {
          add_st2110_40: true,
          transmit_scheduler_uhd: false,
          packing_mode: "GPM",
          ...vf.value,
        },
      };
  }
}

export type AddressTuple = {
  primary: string;
  secondary: string;
};
export type AddressPackage = {
  media: AddressTuple;
  meta?: AddressTuple;
};

export async function make_mcast(
  tx:
    | VAPI.AT1101.RTPTransmitter.AudioStreamerAsNamedTableRow
    | VAPI.AT1130.RTPTransmitter.AudioStreamerAsNamedTableRow
    | VAPI.AT1101.RTPTransmitter.VideoStreamerAsNamedTableRow
    | VAPI.AT1130.RTPTransmitter.VideoStreamerAsNamedTableRow,
  rtp?: { primary: string | null; secondary: string | null },
): Promise<AddressPackage> {
  // There has to be a nicer way to do this
  const video_ = [
    VAPI.AT1101.RTPTransmitter.VideoStreamerAsNamedTableRow,
    VAPI.AT1130.RTPTransmitter.VideoStreamerAsNamedTableRow,
  ];
  const audio_ = [
    VAPI.AT1101.RTPTransmitter.AudioStreamerAsNamedTableRow,
    VAPI.AT1130.RTPTransmitter.AudioStreamerAsNamedTableRow,
  ];
  const vm = hosting_vm(tx);
  const { primary, secondary } =
    rtp ??
    (await (async () => {
      const ifcs = await collect_rtp_interfaces(vm);
      return {
        primary: await get_ip(ifcs.primary),
        secondary: await get_ip(ifcs.secondary),
      };
    })());
  const split = (maybe_addr: null | string) =>
    maybe_addr && !!splitIPv4Address(maybe_addr) ? maybe_addr.split(".") : null;

  // not sure we should require address existence on both ports (what if one isn't connected and we're using
  // DHCP? Is it really a good idea to fill in 127.0.0.1 instead?)
  const assemble = (
    maybe_addr: null | string,
    index: number,
    udp_port: number,
  ): string => {
    const parts = split(maybe_addr);
    if (parts) return `239.${parts[2]}.${parts[3]}.${index}:${udp_port}`;
    enforce(
      false,
      `This implementation of make_mcast requires valid source addresses on all physical network interfaces`,
    );
  };

  const index = tx.index;
  if (video_.some((t) => tx instanceof t)) {
    return {
      media: {
        primary: assemble(primary, index, 9020),
        secondary: assemble(secondary, index, 9020),
      },
      meta: {
        primary: assemble(
          primary,
          vm.r_t_p_transmitter!.runtime_constants.num_videotransmitters + index,
          9040,
        ),
        secondary: assemble(
          secondary,
          vm.r_t_p_transmitter!.runtime_constants.num_videotransmitters + index,
          9040,
        ),
      },
    };
  }
  if (audio_.some((t) => tx instanceof t)) {
    return {
      media: {
        primary: assemble(
          primary,
          2 * vm.r_t_p_transmitter!.runtime_constants.num_videotransmitters +
            index,
          9030,
        ),
        secondary: assemble(
          secondary,
          2 * vm.r_t_p_transmitter!.runtime_constants.num_videotransmitters +
            index,
          9030,
        ),
      },
    };
  }
  throw new Error("Failed to infer Transmitters Media Types");
}

async function has_meta(
  src: VAPI.AT1130.Video.Essence | VAPI.AT1101.Video.Essence,
) {
  return (
    !!(await src.cc.read()) ||
    !!(await src.media_clock.time_code.read())?.ltc ||
    !!(await src.media_clock.time_code.read())?.vtc1 ||
    !!(await src.media_clock.time_code.read())?.vtc2
  );
}

export async function stream_out(
  src:
    | VAPI.AT1130.Video.Essence
    | VAPI.AT1101.Video.Essence
    | VAPI.AT1130.Audio.Essence
    | VAPI.AT1101.Audio.Essence,
) {
  if (
    src instanceof VAPI.AT1130.Video.Essence ||
    src instanceof VAPI.AT1101.Video.Essence
  )
    return stream_video(src);
  if (
    src instanceof VAPI.AT1130.Audio.Essence ||
    src instanceof VAPI.AT1101.Audio.Essence
  )
    return stream_audio(src);
  throw new Error(
    "Failed to determine fitting Transmitter type; Use a more specific function instead.",
  );
}

async function get_ip(ifc: VAPI.NetworkInterfaces.VirtualInterface[]) {
  for (const i of ifc) {
    for (const addr of await i.ip_addresses.rows()) {
      const a = await addr.ip_address.read();
      if (!a || a.startsWith("fe80")) continue;
      if (a && !!splitIPv4Address(a)) return a;
    }
  }
  return null;
}

export async function stream_video(
  src: VAPI.Any.Video.Essence,
  pars?: {
    transport_format?: PartialVideoFormat;
    addresses?: AddressPackage;
    constrain?: boolean;
    vanc?: VAPI.Definitions.BypassReplaceFlags;
    interface_selector?: InterfaceSelector;
    a_src?: VAPI.Any.Audio.Essence;
    vid_std_override?: VAPI.Video.Standard;
  },
): Promise<
  | VAPI.AT1130.RTPTransmitter.VideoStreamerAsNamedTableRow
  | VAPI.AT1101.RTPTransmitter.VideoStreamerAsNamedTableRow
> {
  const constrain = pars?.constrain ?? true;
  const transport_format = pars?.transport_format ?? {
    variant: "ST2110_20",
    value: {
      add_st2110_40: true,
      packing_mode: "GPM",
      transmit_scheduler_uhd: false,
    },
  };
  const vid_std_override = pars?.vid_std_override ?? null;
  const vm = hosting_vm(src);
  enforce(!!vm.r_t_p_transmitter);
  const ifcs = await collect_rtp_interfaces(vm, {
    interface_selector: pars?.interface_selector,
  });
  const session = enforce_nonnull(
    await vm.r_t_p_transmitter?.sessions.create_row(),
  );
  // NOTE: we'll no longer 'enforce_nonnull', so we can set up non-sps flows by simply blanking
  // out the corresponding part of pars.interface_selector
  enforce(
    !!ifcs.primary[0] || !!ifcs.secondary[0],
    `Using the ${
      !!pars?.interface_selector ? "specified" : "default"
    } interface selector, no primary or secondary RTP interfaces could be found on ${vm.raw.identify()}`,
  );
  session.interfaces.command.write({
    primary: ifcs.primary[0],
    secondary: ifcs.secondary[0],
  });

  const vtx = enforce_nonnull(
    await vm.r_t_p_transmitter?.video_transmitters.create_row(),
  );
  await vtx.configuration.transport_format.command.write(
    transport_defaults(transport_format),
  );
  await vtx.generic.hosting_session.command.write(session);
  await vtx.constraints.max_bandwidth.command.write(
    is_higher_than_3G((await src.standard.read()) ?? "HD1080i50")
      ? "b12_0Gb"
      : "b3_0Gb",
  );
  await vtx.constraints.standard.command.write(
    constrain ? vid_std_override ?? (await src.standard.read()) : null,
  );
  await vtx.v_src.command.write(video_ref(src));
  const addresses =
    pars?.addresses ??
    (await make_mcast(vtx, {
      primary: await get_ip(ifcs.primary),
      secondary: await get_ip(ifcs.secondary),
    }));
  await vtx.generic.ip_configuration.video.primary.dst_address.command.write(
    addresses.media.primary,
  );
  await vtx.generic.ip_configuration.video.secondary.dst_address.command.write(
    addresses.media.secondary,
  );
  await vtx.generic.ip_configuration.meta.primary.dst_address.command.write(
    addresses.meta?.primary ?? "",
  );
  await vtx.generic.ip_configuration.meta.secondary.dst_address.command.write(
    addresses.meta?.secondary ?? "",
  );
  await vtx.configuration.vanc.timecode_inserter.command.write(
    (await has_meta(src))
      ? {
          variant: "Passthrough",
          value: {},
        }
      : {
          variant: "Freerun",
          value: {
            enable: { ltc: true, vitc: true },
            initial_timecode: { frames: 0, hours: 0, minutes: 0, seconds: 0 },
          },
        },
  );
  await vtx.configuration.vanc.passthrough_c_y_0.command.write({
    y_obs: true,
    c_unknown: true,
    y_334_vbi: true,
    y_2020_amd: true,
    y_334_cea_608: true,
    y_334_program: true,
    y_rdd_8_op_47: true,
    y_334_cea_708_cdp: true,
    y_2031_dvb_scte_vbi: true,
    y_2010_ansi_scte_104: true,
    y_unknown: true,
    y_334_data: true,
    ...pars?.vanc,
  });

  await vtx.configuration.embedded_audio.command.write(
    (await vtx.configuration.embedded_audio.status.read()).fill("Embed"),
  );
  await vtx.configuration.a_src.command.write(audio_ref(pars?.a_src ?? null));

  await force_activate_hosting_session(vtx);
  return vtx;
}

type StreamAudioPars = {
  format: VAPI.RTPTransmitter.AudioConfigurationFormat;
  addresses?: AddressPackage;
  interface_selector?: InterfaceSelector;
};

export async function stream_audio(
  src: VAPI.AT1130.Audio.Essence | VAPI.AT1101.Audio.Essence,
  params?: Partial<StreamAudioPars>,
) {
  const format = params?.format ?? {
    format: "L24",
    num_channels: (await src.channels.read()).length,
    packet_time: "p0_125",
  };
  const vm = hosting_vm(src);
  enforce(!!vm.r_t_p_transmitter);
  const ifcs = await collect_rtp_interfaces(vm, {
    interface_selector: params?.interface_selector,
  });
  const session = enforce_nonnull(
    await vm.r_t_p_transmitter?.sessions.create_row(),
  );
  session.interfaces.command.write({
    primary: ifcs.primary[0] ?? null,
    secondary: ifcs.secondary[0] ?? null,
  });

  const atx = enforce_nonnull(
    await vm.r_t_p_transmitter?.audio_transmitters.create_row(),
  );
  await atx.configuration.transport_format.command.write(format);
  await atx.generic.hosting_session.command.write(session);
  await atx.a_src.command.write(audio_ref(src));
  const addresses =
    params?.addresses ??
    (await make_mcast(atx, {
      primary:
        ifcs.primary.length !== 0
          ? enforce_nonnull(await get_ip(ifcs.primary), "primary IP address")
          : null,
      secondary:
        ifcs.secondary.length !== 0
          ? enforce_nonnull(
              await get_ip(ifcs.secondary),
              "secondary IP address",
            )
          : null,
    }));
  await atx.generic.ip_configuration.media.primary.dst_address.command.write(
    addresses.media.primary,
  );
  await atx.generic.ip_configuration.media.secondary.dst_address.command.write(
    addresses.media.secondary,
  );
  await force_activate_hosting_session(atx);
  return atx;
}
