diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2026-05-10 19:06:21 +0200 |
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2026-05-20 08:17:07 +0200 |
| commit | 63d3aa9ab8d8b0b6d8a10362e112a431dcb5b4e9 (patch) | |
| tree | 88f0827466b40d0e83da7954123d00cbb5f6c676 /src/tools/oftp/oftp.c | |
| parent | f33769c818cb1f01079405f543b36aa294764112 (diff) | |
| download | ouroboros-63d3aa9ab8d8b0b6d8a10362e112a431dcb5b4e9.tar.gz ouroboros-63d3aa9ab8d8b0b6d8a10362e112a431dcb5b4e9.zip | |
lib: Update FRCP implementation
The Flow and Retransmission Control Protocol (FRCP) runs end-to-end
between two peers over a flow. It provides reliability, in-order
delivery, flow control, and liveness. Note that congestion avoidance
is orthogonal to FRCP and handled in the IPCP.
A fixed 16-octet header, network byte order, is prefixed to every FRCP
packet:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| flags | hcs |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| window |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| seqno |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| ackno |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| payload (variable) ...
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
hcs is a CRC-16-CCITT-FALSE checksum over the PCI (and the stream
extension when present), verified before any flag-driven dispatch. A
single packet can simultaneously carry DATA + ACK + FC + RXM by OR-ing
flag bits. An optional CRC trailer covers the body on DATA when qs.ber
== 0, and on every SACK packet; an optional AEAD wrap (per-flow keys)
sits outermost.
Flag bits (MSB-first; bits 13..15 reserved, MUST be zero):
+------+--------+--------+----------------------------------------+
| Bit | Mask | Name | Meaning |
+------+--------+--------+----------------------------------------+
| 0 | 0x8000 | DATA | Carries caller payload |
| 1 | 0x4000 | DRF | Start of a fresh data run |
| 2 | 0x2000 | ACK | ackno field valid |
| 3 | 0x1000 | NACK | Pre-DRF nudge (seqno informational) |
| 4 | 0x0800 | FC | window field valid (rwe advertisement) |
| 5 | 0x0400 | RDVS | Rendezvous probe (window-closed) |
| 6 | 0x0200 | FFGM | First Fragment of a multi-fragment SDU |
| 7 | 0x0100 | LFGM | Last Fragment of a multi-fragment SDU |
| 8 | 0x0080 | RXM | Retransmission |
| 9 | 0x0040 | SACK | Block list follows in payload |
| 10 | 0x0020 | RTTP | RTT probe / echo (payload follows) |
| 11 | 0x0010 | KA | Keepalive |
| 12 | 0x0008 | FIN | End of stream marker |
| 13-15| -- | -- | Reserved (MUST be zero) |
+------+--------+--------+----------------------------------------+
(FFGM, LFGM) encodes the fragment role of a DATA packet (SCTP-style
B/E): 11=SOLE, 10=FIRST, 00=MID, 01=LAST. Each fragment carries its
own seqno; Retransmission recovers fragments individually, reassembly
runs at consume time. In stream mode FFGM/LFGM are unused; per-byte
position is carried by the stream extension below and end-of-stream is
signalled by FIN on a 0-byte DATA packet.
SACK payload (FRCT_ACK | FRCT_FC | FRCT_SACK):
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| n_blocks | padding (2 octets) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| start[0] |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| end[0] |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
... n_blocks pairs total ...
Each block describes a *present* (received) range strictly above the
cumulative ACK in the PCI ackno. D-SACK (RFC 2883) is signalled
in-band as block[0] - no flag bit, no extra framing - and consumed by
the RACK reo_wnd_mult scaler (RFC 8985 sec. 7.2).
RTTP payload (FRCT_RTTP only; 24 octets):
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| probe_id |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| echo_id |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
+ nonce (16 octets, echoed verbatim) +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Stream PCI extension (in_order == STREAM only; 8 octets after the base
PCI on every DATA packet):
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| start |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| end |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
start, end are monotonic 32-bit byte offsets; end - start equals the
on-wire payload length. Stream mode is negotiated at flow allocation;
the extension is present iff stream mode is in use, never on a
per-packet basis.
Service modes are an orthogonal (in_order, loss, ber) vector selected
at flow_alloc; the cubes above map to the axes:
+----------------+---------+------+-----+-----------------------+
| Cube | in_order| loss | ber | Engaged |
+----------------+---------+------+-----+-----------------------+
| qos_raw | 0 | 1 | 1 | Raw passthrough |
| qos_raw_safe | 0 | 1 | 0 | Raw + CRC trailer |
| qos_rt | 1 | 1 | 1 | FRCP, no FRTX, no CRC |
| qos_rt_safe | 1 | 1 | 0 | FRCP, no FRTX, CRC |
| qos_msg | 1 | 0 | 0 | FRCP + FRTX |
| qos_stream | 2 | 0 | 0 | FRCP + FRTX, stream |
+----------------+---------+------+-----+-----------------------+
in_order=0 sends raw datagrams with no PCI (UDP-equivalent);
in_order=1 engages FRCP with SDU framing; in_order=2 (stream) requires
loss=0 and is rejected otherwise. loss=0 engages the FRTX retransmit
machinery. ber=0 appends the CRC-32 trailer; QOS_DISABLE_CRC at build
time forces ber=1 for development. Encryption is a separate per-flow
attribute layered as an AEAD wrap outside the FRCP packet.
Heritage: delta-t (Watson 1981) supplies timer-based connection
management - no SYN/FIN handshake, the DRF marker, the t_mpl / t_a /
t_r timers. RINA (Day 2008) supplies the unified flow_alloc(name, qos,
...) primitive and the orthogonal QoS-cube axes. Loss detection
follows TCP/QUIC practice (RFCs 2018, 2883, 6582, 6298, 8985); RTT
probing is nonce-authenticated like QUIC PATH_CHALLENGE.
Adds oftp, a minimal file-transfer tool over an FRCP stream flow. The
client reads from stdin or --in FILE and writes through a
flow_alloc(qos_stream); the server (--listen) calls flow_accept and
writes to stdout or --out FILE. Both sides compute a CRC-64/NVMe over
the bytes they handle and print the result. The server rejects flows
whose negotiated qs.in_order != STREAM.
Two FRCP knobs are exposed via env vars on either side:
OFTP_FRCT_RTO_MIN fccntl FRCTSRTOMIN (ns)
OFTP_FRCT_STREAM_RING_SZ fccntl FRCTSRRINGSZ (octets)
The ocbr_client gains an OCBR_QOS env var to pick the cube the client
uses for flow_alloc; recognised values are raw, safe, rt, rt_safe,
msg, stream. Unknown values fall back to raw with a warning on
stderr. Without the env set behaviour is unchanged.
Removes the deprecated lib/timerwheel.c
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/tools/oftp/oftp.c')
| -rw-r--r-- | src/tools/oftp/oftp.c | 441 |
1 files changed, 441 insertions, 0 deletions
diff --git a/src/tools/oftp/oftp.c b/src/tools/oftp/oftp.c new file mode 100644 index 00000000..1ae99403 --- /dev/null +++ b/src/tools/oftp/oftp.c @@ -0,0 +1,441 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2026 + * + * A minimal file-transfer tool over an FRCT stream flow + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#define _POSIX_C_SOURCE 200809L + +#include <ouroboros/crc64.h> +#include <ouroboros/dev.h> +#include <ouroboros/errno.h> +#include <ouroboros/fccntl.h> +#include <ouroboros/qos.h> + +#include <fcntl.h> +#include <inttypes.h> +#include <signal.h> +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <unistd.h> + +#define BUF_SIZE 16384 + +static volatile sig_atomic_t stop = 0; + +static void apply_rto_min_env(int fd) +{ + const char * env; + long v; + + env = getenv("OFTP_FRCT_RTO_MIN"); + if (env == NULL) + return; + v = strtol(env, NULL, 10); + if (v <= 0) + return; + if (fccntl(fd, FRCTSRTOMIN, (time_t) v) < 0) + fprintf(stderr, + "oftp: failed to set RTO_MIN=%ld ns\n", v); +} + +static void apply_stream_ring_sz_env(int fd) +{ + const char * env; + long v; + + env = getenv("OFTP_FRCT_STREAM_RING_SZ"); + if (env == NULL) + return; + v = strtol(env, NULL, 10); + if (v <= 0) + return; + if (fccntl(fd, FRCTSRRINGSZ, (size_t) v) < 0) + fprintf(stderr, + "oftp: failed to set STREAM_RING_SZ=%ld\n", v); +} + +static void on_signal(int signo) +{ + (void) signo; + stop = 1; +} + +static void usage(void) +{ + printf("Usage: oftp [OPTION]...\n" + "Stream-mode file transfer over an Ouroboros flow.\n\n" + " -l, --listen Run as the receiver (server)\n" + " -n, --name NAME Destination service name (client)\n" + " -i, --in FILE Read input from FILE (default stdin)\n" + " -o, --out FILE Write output to FILE (default stdout)\n" + " -N, --bytes SIZE Stop after SIZE bytes " + "(K/M/G suffix; client only)\n" + " --help Display this help text and exit\n"); +} + +static int parse_size(const char * s, size_t * out) +{ + char * end; + unsigned long v; + size_t mul; + + v = strtoul(s, &end, 0); + if (end == s) + return -1; + + mul = 1; + if (*end == 'k' || *end == 'K') + mul = 1024UL; + else if (*end == 'm' || *end == 'M') + mul = 1024UL * 1024UL; + else if (*end == 'g' || *end == 'G') + mul = 1024UL * 1024UL * 1024UL; + else if (*end != '\0') + return -1; + + *out = (size_t) v * mul; + return 0; +} + +static void report_xfer(const char * tag, + size_t total, + uint64_t crc, + const struct timespec * t0, + const struct timespec * t1) +{ + double elapsed_s; + double mib_per_s; + + elapsed_s = (t1->tv_sec - t0->tv_sec) + + (t1->tv_nsec - t0->tv_nsec) / 1e9; + if (elapsed_s <= 0.0) + elapsed_s = 1e-9; + + mib_per_s = ((double) total / (1024.0 * 1024.0)) / elapsed_s; + + fprintf(stderr, + "oftp: %s %zu bytes in %.3f s (%.2f MiB/s) " + "crc64=%016" PRIx64 "\n", + tag, total, elapsed_s, mib_per_s, crc); +} + +static int xfer_to_flow(int fd, FILE * in, size_t max_bytes) +{ + char buf[BUF_SIZE]; + size_t n; + size_t total; + size_t want; + size_t off; + ssize_t w; + uint64_t crc; + struct timespec t0; + struct timespec t1; + + total = 0; + crc = 0; + + clock_gettime(CLOCK_MONOTONIC, &t0); + + while (!stop) { + want = sizeof(buf); + if (max_bytes > 0 && max_bytes - total < want) + want = max_bytes - total; + if (want == 0) + break; + + n = fread(buf, 1, want, in); + if (n == 0) + break; + + crc64_nvme(&crc, buf, n); + + off = 0; + while (off < n) { + w = flow_write(fd, buf + off, n - off); + if (w < 0) { + fprintf(stderr, + "flow_write failed: %zd\n", w); + return 1; + } + off += (size_t) w; + total += (size_t) w; + } + } + + clock_gettime(CLOCK_MONOTONIC, &t1); + + if (ferror(in)) { + fprintf(stderr, "Input read error.\n"); + return 1; + } + + report_xfer("sent", total, crc, &t0, &t1); + return 0; +} + +static int xfer_from_flow(int fd, FILE * out) +{ + char buf[BUF_SIZE]; + size_t total; + ssize_t n; + uint64_t crc; + struct timespec timeout; + struct timespec t0; + struct timespec t1; + bool started; + + total = 0; + crc = 0; + started = false; + timeout.tv_sec = 1; + timeout.tv_nsec = 0; + + /* Short timeout so SIGTERM/SIGINT 'stop' is observed promptly. */ + fccntl(fd, FLOWSRCVTIMEO, &timeout); + + while (!stop) { + n = flow_read(fd, buf, sizeof(buf)); + if (n == 0) { + /* Clean EOF: peer sent EOS and we drained it. */ + clock_gettime(CLOCK_MONOTONIC, &t1); + fflush(out); + if (!started) + t0 = t1; + report_xfer("received", total, crc, &t0, &t1); + return 0; + } + if (n == -ETIMEDOUT) + continue; + if (n < 0) { + /* Peer aborted before EOS: partial transfer. */ + if (n == -EFLOWDOWN || n == -EFLOWPEER) { + fprintf(stderr, + "oftp: peer aborted at %zu B\n", + total); + return 2; + } + fprintf(stderr, + "flow_read failed: %zd\n", n); + return 1; + } + if (!started) { + clock_gettime(CLOCK_MONOTONIC, &t0); + started = true; + } + crc64_nvme(&crc, buf, (size_t) n); + if (fwrite(buf, 1, (size_t) n, out) != (size_t) n) { + fprintf(stderr, "Output write error.\n"); + return 1; + } + total += (size_t) n; + } + + /* Receiver was signalled (SIGINT/SIGTERM) before EOF. */ + fflush(out); + fprintf(stderr, "oftp: interrupted at %zu B\n", total); + return 2; +} + +static int server_main(const char * outpath) +{ + FILE * out = stdout; + int fd; + int ofd; + int rc; + qosspec_t qs; + + if (outpath != NULL) { + ofd = open(outpath, + O_WRONLY | O_CREAT | O_EXCL | O_NOFOLLOW, + 0600); + if (ofd < 0) { + perror("open"); + return 1; + } + out = fdopen(ofd, "wb"); + if (out == NULL) { + perror("fdopen"); + close(ofd); + unlink(outpath); + return 1; + } + } + + fprintf(stderr, "oftp: listening...\n"); + + fd = flow_accept(&qs, NULL); + if (fd < 0) { + fprintf(stderr, "flow_accept failed: %d\n", fd); + if (out != stdout) + fclose(out); + return 1; + } + + if (qs.service != SVC_STREAM) { + fprintf(stderr, + "oftp: rejecting non-stream flow (service=%u)\n", + qs.service); + flow_dealloc(fd); + if (out != stdout) { + fclose(out); + unlink(outpath); + } + return 1; + } + + apply_rto_min_env(fd); + apply_stream_ring_sz_env(fd); + + rc = xfer_from_flow(fd, out); + + flow_dealloc(fd); + + if (out != stdout) { + fclose(out); + /* Drop the half-written file on abort/interrupt. */ + if (rc != 0) + unlink(outpath); + } + + return rc; +} + +static int client_main(const char * name, + const char * inpath, + size_t max_bytes) +{ + FILE * in; + int fd; + int rc; + qosspec_t qs; + + in = stdin; + qs = qos_stream; + + if (inpath != NULL) { + in = fopen(inpath, "rb"); + if (in == NULL) { + perror("fopen"); + return 1; + } + } + + fd = flow_alloc(name, &qs, NULL); + if (fd < 0) { + fprintf(stderr, "flow_alloc failed: %d\n", fd); + if (in != stdin) + fclose(in); + return 2; + } + + apply_rto_min_env(fd); + apply_stream_ring_sz_env(fd); + + rc = xfer_to_flow(fd, in, max_bytes); + + flow_dealloc(fd); + + if (in != stdin) + fclose(in); + + return rc; +} + +int main(int argc, char ** argv) +{ + bool server; + const char * name; + const char * inpath; + const char * outpath; + size_t max_bytes; + struct sigaction sa; + + server = false; + name = NULL; + inpath = NULL; + outpath = NULL; + max_bytes = 0; + + memset(&sa, 0, sizeof(sa)); + sa.sa_handler = on_signal; + sigaction(SIGINT, &sa, NULL); + sigaction(SIGTERM, &sa, NULL); + signal(SIGPIPE, SIG_IGN); + + argc--; argv++; + while (argc > 0) { + if (strcmp(*argv, "-l") == 0 || + strcmp(*argv, "--listen") == 0) { + server = true; + } else if ((strcmp(*argv, "-n") == 0 || + strcmp(*argv, "--name") == 0) && argc > 1) { + name = *(++argv); argc--; + } else if ((strcmp(*argv, "-i") == 0 || + strcmp(*argv, "--in") == 0) && argc > 1) { + inpath = *(++argv); argc--; + } else if ((strcmp(*argv, "-o") == 0 || + strcmp(*argv, "--out") == 0) && argc > 1) { + outpath = *(++argv); argc--; + } else if ((strcmp(*argv, "-N") == 0 || + strcmp(*argv, "--bytes") == 0) && argc > 1) { + if (parse_size(*(++argv), &max_bytes) < 0) { + fprintf(stderr, + "oftp: bad size '%s'\n", *argv); + return 1; + } + argc--; + } else if (strcmp(*argv, "--help") == 0) { + usage(); + return 0; + } else { + usage(); + return 1; + } + argc--; argv++; + } + + if (server) + return server_main(outpath); + + if (name == NULL) { + usage(); + return 1; + } + + return client_main(name, inpath, max_bytes); +} |
