// actor_distributed_pkg.sv // // Transport bridge templates — the distribution leg of Chapter 6. A bridge actor // wraps a serialization + transport implementation behind the standard Actor // interface, so distributing the topology across processes / machines becomes // a per-stream choice rather than a global rewrite. // // Selection strategy (from the distribution analysis in Chapter 6): // // inproc SV mailbox ~5 ns existing actor_pkg core // intra-mach Iceoryx zero-copy ~50 ns kernel-bypass shared memory // inter-mach libfabric / RDMA ~700 ns kernel-bypass IB / RoCE // ergonomic ZeroMQ ~1-10 us easy default, polyglot // durable NATS JetStream ~1 ms recorder/replay durability // // All four backends are sketched here as DPI-C wrappers. The C-side // implementation files (zmq_dpi.c, iceoryx_dpi.cpp, libfabric_dpi.c, // nats_dpi.c) are the user's to provide; they are not included. // Compile only the bridge(s) you need and link the corresponding library. package actor_distributed_pkg; import actor_pkg::*; typedef enum { TRANSPORT_INPROC, // SV mailbox, same process TRANSPORT_ICEORYX, // shared memory zero-copy TRANSPORT_LIBFABRIC, // RDMA cross-machine TRANSPORT_ZMQ, // ZeroMQ inproc/ipc/tcp TRANSPORT_NATS_JS // NATS JetStream (durable) } TransportClass_e; // --------------------------------------------------------------------------- // DPI imports — the C side is provided by the user's chosen backend(s). // Each backend exposes init(endpoint, topic) + send(bytes) + recv(out_bytes). // --------------------------------------------------------------------------- import "DPI-C" function void zmq_dpi_init_pub (string endpoint); import "DPI-C" function void zmq_dpi_pub_send (string topic, byte unsigned bytes[]); import "DPI-C" function void zmq_dpi_init_sub (string endpoint, string topic); import "DPI-C" function int zmq_dpi_sub_recv (output byte unsigned bytes[]); import "DPI-C" function void iceoryx_dpi_init_pub (string service, string instance_id); import "DPI-C" function void iceoryx_dpi_pub_send (byte unsigned bytes[]); import "DPI-C" function void iceoryx_dpi_init_sub (string service, string instance_id); import "DPI-C" function int iceoryx_dpi_sub_recv (output byte unsigned bytes[]); import "DPI-C" function void libfabric_dpi_init (string provider, string node, string svc); import "DPI-C" function void libfabric_dpi_send (byte unsigned bytes[]); import "DPI-C" function int libfabric_dpi_recv (output byte unsigned bytes[]); import "DPI-C" function void nats_js_dpi_init (string url, string subject); import "DPI-C" function void nats_js_dpi_publish (byte unsigned bytes[]); import "DPI-C" function int nats_js_dpi_subscribe (output byte unsigned bytes[]); // --------------------------------------------------------------------------- // TransportBridgeActor (abstract) — common shape for every backend. // Subclasses provide `serialize` / `deserialize` for their wire format. // --------------------------------------------------------------------------- virtual class TransportBridgeActor extends Actor; TransportClass_e transport; string endpoint; string topic; int batch_size = 1; // amortize DPI cost MsgBase outbox[$]; function new(string name, TransportClass_e t, string ep, string top); super.new(name); transport = t; endpoint = ep; topic = top; endfunction pure virtual function void send_bytes (byte unsigned bytes[]); pure virtual function int recv_bytes (output byte unsigned bytes[]); // No default wire layout exists --- the concrete subclass picks it // (e.g. Cap'n Proto). An un-overridden serialize would silently send // zero-length frames, so fail loudly instead. virtual function void serialize(MsgBase msg, output byte unsigned bytes[]); bytes = new[0]; $fatal(1, "%s: serialize() not overridden --- no default wire layout", name); endfunction virtual function MsgBase deserialize(byte unsigned bytes[]); return null; endfunction // Outbound side: wire this bridge to local actors via `WIRE for each // specific message type it can serialize. virtual task act(MsgBase msg); outbox.push_back(msg); if (outbox.size() >= batch_size) flush(); endtask function void flush(); foreach (outbox[i]) begin byte unsigned bytes[]; serialize(outbox[i], bytes); send_bytes(bytes); end outbox.delete(); endfunction // run() serves both directions. The base Actor loop is the ONLY caller // of act(), so replacing it outright would orphan the outbound side: // everything `WIRE'd to the bridge would queue in mbox forever. Fork the // inherited drain loop alongside the inbound transport poll. virtual task run(); fork begin : outbound MsgBase mm; forever begin mbox.get(mm); act(mm); end end begin : inbound byte unsigned bytes[]; MsgBase m; forever begin int n = recv_bytes(bytes); if (n > 0) begin m = deserialize(bytes); if (m != null) publish(m); end else begin #1ns; // backoff when empty end end end join endtask endclass // --------------------------------------------------------------------------- // Concrete bridges — minimal wiring of the DPI calls. Each lives behind the // same TransportBridgeActor surface so the rest of the topology is unaware // of which backend is in play. // // These four classes pass dynamic-array actuals (`byte unsigned bytes[]`) to // the DPI open-array formals above. That is legal IEEE-1800 DPI-C and is // accepted by VCS / Questa / Xcelium, but Verilator 5.x has not implemented // it (V3Task: "Passing dynamic array or queue as actual argument to DPI open // array is not yet supported"). Verilator auto-defines `VERILATOR`, so the // guard below hides only the concrete bridges from Verilator's lint pass // while real simulators still compile them. The abstract TransportBridgeActor // and the DPI import declarations stay visible to Verilator for syntax checks. // --------------------------------------------------------------------------- `ifndef VERILATOR class ZmqBridgeActor extends TransportBridgeActor; function new(string name = "ZmqBridge", string ep = "tcp://*:5555", string top = "actor.bus"); super.new(name, TRANSPORT_ZMQ, ep, top); zmq_dpi_init_pub(ep); zmq_dpi_init_sub(ep, top); endfunction virtual function void send_bytes(byte unsigned bytes[]); zmq_dpi_pub_send(topic, bytes); endfunction virtual function int recv_bytes(output byte unsigned bytes[]); return zmq_dpi_sub_recv(bytes); endfunction endclass class IceoryxBridgeActor extends TransportBridgeActor; string service; string instance_name; function new(string name = "IceoryxBridge", string svc = "actor", string inst = "bus"); super.new(name, TRANSPORT_ICEORYX, svc, inst); service = svc; instance_name = inst; iceoryx_dpi_init_pub(svc, inst); iceoryx_dpi_init_sub(svc, inst); endfunction virtual function void send_bytes(byte unsigned bytes[]); iceoryx_dpi_pub_send(bytes); endfunction virtual function int recv_bytes(output byte unsigned bytes[]); return iceoryx_dpi_sub_recv(bytes); endfunction endclass class LibfabricBridgeActor extends TransportBridgeActor; function new(string name = "LibfabricBridge", string provider = "verbs", string node = "", string svc = ""); super.new(name, TRANSPORT_LIBFABRIC, node, svc); libfabric_dpi_init(provider, node, svc); endfunction virtual function void send_bytes(byte unsigned bytes[]); libfabric_dpi_send(bytes); endfunction virtual function int recv_bytes(output byte unsigned bytes[]); return libfabric_dpi_recv(bytes); endfunction endclass class NatsJsBridgeActor extends TransportBridgeActor; function new(string name = "NatsJsBridge", string url = "nats://localhost:4222", string subject = "actor.bus"); super.new(name, TRANSPORT_NATS_JS, url, subject); nats_js_dpi_init(url, subject); endfunction virtual function void send_bytes(byte unsigned bytes[]); nats_js_dpi_publish(bytes); endfunction virtual function int recv_bytes(output byte unsigned bytes[]); return nats_js_dpi_subscribe(bytes); endfunction endclass `endif // VERILATOR endpackage
// actor_lifecycle_pkg.sv // // Operational facilities every non-trivial actor system needs: // // ActorRegistry — process registry: name -> handle (Erlang's `register/2`) // TimerActor — send_after / send_periodic — scheduled message dispatch // DeadLetterActor — captures undeliverable messages for diagnostics // StartupSequence — orders actor start-up (e.g. monitor before driver) // // TimerActor and DeadLetterActor are themselves actors, so they participate // in supervision and observability; ActorRegistry and StartupSequence are // plain utility classes (no mailbox, no lifecycle). package actor_lifecycle_pkg; import actor_pkg::*; // --------------------------------------------------------------------------- // ActorRegistry — lookup table from canonical name to actor handle. // Static class (no instances) so any actor in the topology can resolve // a peer by name without explicit dependency injection. // --------------------------------------------------------------------------- class ActorRegistry; static Actor by_name[string]; static Actor by_id [int]; static function void register(Actor a); if (by_name.exists(a.name)) $fatal(1, "ActorRegistry: duplicate name '%s'", a.name); by_name[a.name] = a; by_id [a.id] = a; endfunction static function void unregister(Actor a); if (by_name.exists(a.name)) by_name.delete(a.name); if (by_id.exists(a.id)) by_id.delete(a.id); endfunction static function Actor lookup(string name); if (by_name.exists(name)) return by_name[name]; return null; endfunction static function Actor lookup_by_id(int id); if (by_id.exists(id)) return by_id[id]; return null; endfunction static function int size(); return by_name.size(); endfunction endclass // --------------------------------------------------------------------------- // TimerActor — schedule one-shot or periodic message dispatch. // // send_after(target, msg, delay_ns) — fire once after delay // send_periodic(target, msg, period_ns) — fire repeatedly // // Cancellation is by token returned from the schedule call. Deliveries are // stamped with the TimerActor's id. A periodic schedule re-delivers the // SAME message handle each tick — retaining subscribers see N aliases of // one object; schedule a fresh message per tick if that matters. // --------------------------------------------------------------------------- typedef struct { int token; Actor target; MsgBase msg; longint unsigned delay_ns; longint unsigned period_ns; // 0 = one-shot bit active; } TimerEntry_s; class TimerActor extends Actor; TimerEntry_s timers[int]; // token -> entry int next_token = 1; function new(string name = "TimerActor"); super.new(name); endfunction function int send_after(Actor target, MsgBase msg, longint unsigned delay_ns); TimerEntry_s e; e.token = next_token++; e.target = target; e.msg = msg; e.delay_ns = delay_ns; e.period_ns = 0; e.active = 1; timers[e.token] = e; fork run_timer(e.token); join_none return e.token; endfunction function int send_periodic(Actor target, MsgBase msg, longint unsigned period_ns); TimerEntry_s e; e.token = next_token++; e.target = target; e.msg = msg; e.delay_ns = period_ns; e.period_ns = period_ns; e.active = 1; timers[e.token] = e; fork run_timer(e.token); join_none return e.token; endfunction function void cancel(int token); if (timers.exists(token)) timers[token].active = 0; endfunction task run_timer(int token); forever begin TimerEntry_s e; if (!timers.exists(token)) return; e = timers[token]; if (!e.active) return; #(e.delay_ns * 1ns); if (!timers[token].active) return; e.msg.stamp(this.id); // lineage: timer deliveries are not anonymous void'(e.target.mbox.try_put(e.msg)); if (e.period_ns == 0) begin timers[token].active = 0; return; end end endtask endclass // --------------------------------------------------------------------------- // DeadLetterActor — catches messages that had nowhere to go. // // Other actors call `record(msg, reason)` when they detect an undeliverable // envelope. Useful diagnostic for "why is my coverage stuck" — usually a // wiring mistake that drops messages silently. // --------------------------------------------------------------------------- typedef struct { string type_name; string reason; longint unsigned timestamp; int unsigned from_id; } DeadLetterEntry_s; class DeadLetterActor extends Actor; DeadLetterEntry_s log[$]; function new(string name = "DeadLetters"); super.new(name); endfunction function void record(MsgBase msg, string reason); DeadLetterEntry_s e; e.type_name = msg.getTypeName(); e.reason = reason; e.timestamp = $time; e.from_id = msg.sender_id; log.push_back(e); endfunction function void report(); $display("[DeadLetters] %0d entries", log.size()); foreach (log[i]) begin $display(" [%0t] from=%0d type=%s reason=%s", log[i].timestamp, log[i].from_id, log[i].type_name, log[i].reason); end endfunction endclass // --------------------------------------------------------------------------- // StartupSequence — bring actors up in a defined order, with optional gate // messages between phases. Ensures monitors are running before drivers start // emitting transactions, RAL shadow ready before scoreboards check, etc. // --------------------------------------------------------------------------- class StartupSequence; Actor phases[$][$]; // phases[phase_idx][actor_idx] function void add_phase(Actor actors[$]); phases.push_back(actors); endfunction task run(longint unsigned phase_gap_ns = 0); foreach (phases[p]) begin foreach (phases[p][i]) phases[p][i].start(); if (phase_gap_ns > 0) #(phase_gap_ns * 1ns); end endtask endclass endpackage
// actor_observability_pkg.sv // // Production observability primitives. None of them modify the actor // topology. Tracer, LatencyHistogram, and StructuredLog are passive // subscribers attached via typed `WIRE — one `WIRE per observed type, which // keeps the topology fully explicit in the wiring code. MailboxMetricsActor // is the exception: a poller registered via track() that samples queue // depths on a timer and publishes MailboxSample_s records (no wiring edge). // // MailboxMetricsActor — mailbox depth per actor, Prometheus-style gauges // TracerActor — emits OpenTelemetry-style span records from // MsgBase.trace_id chains // LatencyHistogramActor — bucketed histograms keyed by message type // StructuredLogActor — JSON / Apache-Arrow-friendly event emission package actor_observability_pkg; import actor_pkg::*; // --------------------------------------------------------------------------- // MailboxMetricsActor — sample mailbox depths periodically. Exports a snapshot // suitable for Prometheus scraping (when paired with a DPI exporter). // --------------------------------------------------------------------------- typedef struct { string actor_name; int depth; longint unsigned total_received; longint unsigned timestamp; } MailboxSample_s; class MailboxMetricsActor extends Actor; Actor tracked[$]; MailboxSample_s history[$]; longint unsigned sample_period_ns = 100_000; // 100 us function new(string name = "MailboxMetrics"); super.new(name); endfunction function void track(Actor a); tracked.push_back(a); endfunction virtual task run(); forever begin #(sample_period_ns * 1ns); foreach (tracked[i]) begin MailboxSample_s s; s.actor_name = tracked[i].name; s.depth = tracked[i].mbox.num(); s.total_received = 0; // depth-only today: the core Actor keeps no // received counter, so throughput is not measured s.timestamp = $time; history.push_back(s); `PUBLISH(s); end end endtask function void dump(); $display("[Metrics] %0d samples", history.size()); foreach (history[i]) $display(" [%0t] %s depth=%0d", history[i].timestamp, history[i].actor_name, history[i].depth); endfunction endclass // --------------------------------------------------------------------------- // TracerActor — every observed message contributes one span. Spans are // stitched into a trace via trace_id; parent_span gives causal ordering. // // Output shape mirrors OpenTelemetry's Span: { trace_id, span_id, parent_id, // service_name, operation, start, end }. A DPI exporter can ship to // Jaeger / Tempo / SigNoz without modifying actor logic. // --------------------------------------------------------------------------- typedef struct { longint unsigned trace_id; longint unsigned span_id; longint unsigned parent_span; string service_name; string operation; longint unsigned start_ns; longint unsigned end_ns; } Span_s; class TracerActor extends Actor; Span_s spans[$]; function new(string name = "Tracer"); super.new(name); endfunction virtual task act(MsgBase msg); Span_s s; s.trace_id = msg.trace_id; s.span_id = msg.timestamp_ns; // span_id = emission ts: // unique only when stages // have nonzero latency; // zero-delay chains share it s.parent_span = msg.parent_span; s.service_name = "actor"; // override per-actor s.operation = msg.getTypeName(); s.start_ns = msg.timestamp_ns; s.end_ns = $time; spans.push_back(s); endtask function void export_jsonl(string path); int fd = $fopen(path, "w"); foreach (spans[i]) begin $fwrite(fd, "{\"trace\":%0d,\"span\":%0d,\"parent\":%0d,\"op\":\"%s\",\"start\":%0d,\"end\":%0d}\n", spans[i].trace_id, spans[i].span_id, spans[i].parent_span, spans[i].operation, spans[i].start_ns, spans[i].end_ns); end $fclose(fd); endfunction endclass // --------------------------------------------------------------------------- // LatencyHistogramActor — per-message-type histogram of receive latency. // Subclass and override `extract_latency()` to compute latency from your // domain message type (e.g. response.timestamp - request.timestamp). // --------------------------------------------------------------------------- virtual class LatencyHistogramActor extends Actor; longint unsigned buckets_ns[8] = '{ 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000 }; // Nested associative on the bucket index — avoids fixed-size sub-array // increment patterns that some simulators can't lower cleanly. int bucket_counts[string][int]; function new(string name = "LatencyHistogram"); super.new(name); endfunction pure virtual function longint unsigned extract_latency_ns(MsgBase msg); virtual task act(MsgBase msg); longint unsigned lat = extract_latency_ns(msg); string t = msg.getTypeName(); int b = 0; int cur; while (b < 8 && lat > buckets_ns[b]) b++; cur = bucket_counts[t].exists(b) ? bucket_counts[t][b] : 0; bucket_counts[t][b] = cur + 1; endtask function void report(); foreach (bucket_counts[t]) begin $display("[Histogram] %s", t); for (int b = 0; b < 9; b++) if (bucket_counts[t].exists(b)) $display(" bucket[%0d]: %0d", b, bucket_counts[t][b]); end endfunction endclass // --------------------------------------------------------------------------- // StructuredLogActor — emit one JSON line per observed message. Replaces // unstructured $display logs with a parquet/json/elasticsearch-friendly // stream. Subclass to override `serialize()` for domain-specific payload // shaping. // --------------------------------------------------------------------------- class StructuredLogActor extends Actor; int fd; function new(string name = "StructuredLog", string path = "actor_log.jsonl"); super.new(name); fd = $fopen(path, "w"); endfunction virtual function string serialize(MsgBase msg); return $sformatf("{\"ts\":%0d,\"trace\":%0d,\"from\":%0d,\"type\":\"%s\"}", msg.timestamp_ns, msg.trace_id, msg.sender_id, msg.getTypeName()); endfunction virtual task act(MsgBase msg); if (fd != 0) $fwrite(fd, "%s\n", serialize(msg)); endtask virtual function void on_terminate(); if (fd != 0) $fclose(fd); endfunction endclass endpackage
// actor_patterns_pkg.sv // // Standard idioms borrowed from Akka and Erlang/OTP that aren't in the core: // // Ask — request/reply with a per-call reply mailbox (Future-style) // Stash — defer messages while in a "not ready" mode, replay on ready // Become — push/pop message-handler stack for state-driven actors // SelectiveRecv — wait for one message type, re-queuing the other types // // Each is a mixin-style base class. Inherit from it instead of Actor when you // want that pattern. They are independent — combine them by composition rather // than multiple inheritance (SV does not support MI). package actor_patterns_pkg; import actor_pkg::*; // --------------------------------------------------------------------------- // AskActor — request/reply pattern. // // Caller invokes `ask(target, request_msg, reply_out)`. A private mailbox is // created per call, attached to the request, and the caller blocks on it. // Target actor reads `_reply_mbox` from the message envelope and `try_put`s // its reply there. This avoids polluting the caller's main mailbox. // --------------------------------------------------------------------------- class AskMsg extends MsgBase; MsgBase request; mailbox #(MsgBase) reply_mbox; function new(MsgBase req); request = req; reply_mbox = new(1); endfunction virtual function string getTypeName(); // Same key convention as Msg#(T): the string `WIRE captures via // $typename is the string publish() routes on. return $typename(AskMsg); endfunction endclass class AskActor extends Actor; function new(string name = "AskActor", int capacity = 0); super.new(name, capacity); endfunction // Send `request` to `target`, block until reply received or timeout. // On timeout `reply` is null; on success it holds the reply message. // // Implementation note: Verilator does not allow writing to a captured // `output` argument from inside a fork that contains a timing control, // so we route through a local intermediate and assign to `reply` after // the fork joins. virtual task ask(Actor target, MsgBase request, output MsgBase reply, input longint unsigned timeout_ns = 1_000_000); AskMsg envelope = new(request); MsgBase local_reply = null; bit got_reply = 0; envelope.stamp(this.id); // The envelope is discarded by unpack(); carry its lineage onto the // payload the server sees, or tracing fractures at every ask hop. request.trace_id = envelope.trace_id; request.parent_span = envelope.timestamp_ns; void'(target.mbox.try_put(envelope)); fork : ask_block begin envelope.reply_mbox.get(local_reply); got_reply = 1; end begin #(timeout_ns * 1ns); end join_any disable ask_block; reply = got_reply ? local_reply : null; endtask // Helper for the receiving side: extract request and a reply handle. // Always writes to the outputs (null on cast failure) so callers see // well-defined values regardless of input. static function void unpack(MsgBase msg, output MsgBase request, output mailbox #(MsgBase) reply_mbox); AskMsg env; request = null; reply_mbox = null; if ($cast(env, msg)) begin request = env.request; reply_mbox = env.reply_mbox; end endfunction endclass // --------------------------------------------------------------------------- // StashActor — defer messages while not ready (e.g. during reset, init). // // Subclass calls `stash(msg)` to set aside a message, `unstash_all()` to // replay them through act() once ready. // --------------------------------------------------------------------------- class StashActor extends Actor; MsgBase stashed[$]; function new(string name = "StashActor", int capacity = 0); super.new(name, capacity); endfunction function void stash(MsgBase msg); stashed.push_back(msg); endfunction task unstash_all(); MsgBase m; while (stashed.size() > 0) begin m = stashed.pop_front(); act(m); end endtask function int stashed_depth(); return stashed.size(); endfunction endclass // --------------------------------------------------------------------------- // BecomeActor — handler stack. Push a new behavior with `become(handler_id)`, // pop with `unbecome()`. Subclasses override `dispatch(state, msg)`. // // This replaces sprinkling `if (state == X)` checks throughout act() with an // explicit state-dispatch table. // --------------------------------------------------------------------------- virtual class BecomeActor extends Actor; int behavior_stack[$]; function new(string name = "BecomeActor", int initial_behavior = 0, int capacity = 0); super.new(name, capacity); behavior_stack.push_front(initial_behavior); endfunction function int current_behavior(); return behavior_stack[0]; endfunction function void become(int new_behavior); behavior_stack.push_front(new_behavior); endfunction function void unbecome(); if (behavior_stack.size() > 1) void'(behavior_stack.pop_front()); endfunction pure virtual task dispatch(int behavior, MsgBase msg); virtual task act(MsgBase msg); dispatch(current_behavior(), msg); endtask endclass // --------------------------------------------------------------------------- // SelectiveReceiveActor — drain the mailbox into a working queue, then // pattern-match by type, processing the next message that matches a filter // and putting the rest back. Mirrors Erlang's selective receive. // --------------------------------------------------------------------------- class SelectiveReceiveActor extends Actor; function new(string name = "SelectiveReceiveActor", int capacity = 0); super.new(name, capacity); endfunction // Wait until a message of the given type-name arrives, returning it via // `out_msg` (null on timeout). Non-matching messages are re-queued at // the back of the mailbox --- behind anything that arrived while // waiting --- so their order relative to each other is kept, but not // their order against new arrivals. The timeout is counted in 1 ns // polls, independent of the enclosing timescale. task receive_only(string wanted_type, output MsgBase out_msg, input longint unsigned timeout_ns = 1_000_000); MsgBase scratch[$]; MsgBase m; longint unsigned polls = 0; out_msg = null; forever begin if (mbox.num() == 0) begin if (polls >= timeout_ns) break; #1ns; polls++; continue; end mbox.get(m); if (m.getTypeName() == wanted_type) begin out_msg = m; break; end else begin scratch.push_back(m); end end // Re-queue the deferred messages. On a bounded mailbox that refilled // while we waited, dropping them silently would defeat the pattern --- // report it. foreach (scratch[i]) if (!mbox.try_put(scratch[i])) $error("%s: receive_only dropped a deferred %s (mailbox full)", name, scratch[i].getTypeName()); endtask endclass endpackage
// actor_persistence_pkg.sv // // Record / replay infrastructure — the record/replay leg of Chapter 6's // persistence discussion. Because every message is an immutable struct on a // known topic, // the recorder is a passive subscriber and the replayer is a passive // publisher. This collapses non-deterministic regression bugs into // deterministic 30-second reproducers. // // RecorderActor — taps any pub/sub stream, writes (ts, type, bytes) // ReplayActor — reads a log, republishes with original timing // EventSourcedActor — base class that reconstructs state from event log // // The on-disk format is intentionally simple text-CSV. A production version // pairs this with a binary serialization (e.g. Cap'n Proto), supplied via // TransportBridgeActor::serialize overrides in actor_distributed_pkg, for // cross-language replay. package actor_persistence_pkg; import actor_pkg::*; // --------------------------------------------------------------------------- // RecorderActor — every received message is logged with timestamp, sender, // and a textual rendering of the payload. Subclasses can override // `serialize_payload()` to switch to binary, Cap'n Proto, etc. // --------------------------------------------------------------------------- class RecorderActor extends Actor; int fd; int count; function new(string name = "Recorder", string path = "actor_trace.csv"); super.new(name); fd = $fopen(path, "w"); count = 0; if (fd != 0) $fwrite(fd, "ts_ns,trace_id,sender_id,type_name,payload\n"); endfunction virtual function string serialize_payload(MsgBase msg); return $sformatf("%p", msg); endfunction virtual task act(MsgBase msg); if (fd == 0) return; $fwrite(fd, "%0d,%0d,%0d,%s,%s\n", msg.timestamp_ns, msg.trace_id, msg.sender_id, msg.getTypeName(), serialize_payload(msg)); count++; endtask virtual function void on_terminate(); if (fd != 0) $fclose(fd); fd = 0; endfunction endclass // --------------------------------------------------------------------------- // ReplayActor — reads a recorded log and republishes envelopes with the // recorded inter-arrival timing. Subclasses override `deserialize` to // reconstruct domain typed messages from the textual payload column. // // Replay is bit-deterministic when: // 1. The DUT is reset to the same starting state // 2. All non-replayed actors are deterministic (no $urandom outside replay) // 3. The transport preserves ordering (true for in-process mailboxes) // --------------------------------------------------------------------------- virtual class ReplayActor extends Actor; string path; function new(string name = "Replay", string trace_path = "actor_trace.csv"); super.new(name); path = trace_path; endfunction pure virtual function MsgBase deserialize(string type_name, string payload); virtual task run(); int fd = $fopen(path, "r"); string line; longint unsigned last_ts = 0; bit is_first = 1; if (fd == 0) begin $display("[Replay %s] cannot open %s", name, path); return; end // skip header void'($fgets(line, fd)); while (!$feof(fd)) begin longint unsigned ts; longint unsigned trace_id; int unsigned sender_id; string type_name; string payload; MsgBase msg; int c1, c2, c3, c4; if ($fgets(line, fd) == 0) break; if (line == "" || line == "\n") continue; if (line[line.len()-1] == "\n") line = line.substr(0, line.len()-2); // Split on the first four commas by scanning. $sscanf %s cannot do // this: it is whitespace-delimited and greedy, and both the // getTypeName() strings and the %p payload rendering contain spaces // and commas. Everything after the fourth comma is the payload. c1 = find_comma(line, 0); c2 = (c1 < 0) ? -1 : find_comma(line, c1 + 1); c3 = (c2 < 0) ? -1 : find_comma(line, c2 + 1); c4 = (c3 < 0) ? -1 : find_comma(line, c3 + 1); if (c4 < 0) continue; if ($sscanf(line.substr(0, c1 - 1), "%d", ts) != 1) continue; if ($sscanf(line.substr(c1 + 1, c2 - 1), "%d", trace_id) != 1) continue; if ($sscanf(line.substr(c2 + 1, c3 - 1), "%d", sender_id) != 1) continue; type_name = line.substr(c3 + 1, c4 - 1); payload = line.substr(c4 + 1, line.len() - 1); if (!is_first && ts > last_ts) #((ts - last_ts) * 1ns); is_first = 0; last_ts = ts; msg = deserialize(type_name, payload); if (msg != null) begin // publish() re-stamps: only the trace_id restoration survives // (stamp keeps nonzero trace_ids); subscribers see replay-time // timestamps and the replayer as sender, by design. msg.trace_id = trace_id; publish(msg); end end $fclose(fd); endtask // Index of the next comma at or after `from`, or -1. protected function int find_comma(string s, int from); for (int i = from; i < s.len(); i++) if (s[i] == ",") return i; return -1; endfunction endclass // --------------------------------------------------------------------------- // EventSourcedActor — state is the fold over its received message history. // On startup, a recorded log is replayed *into the actor itself* to // reconstruct prior state. Ordering relative to live traffic is the // caller's responsibility: gate producers on `replaying`, or wire a done // message (replay_from()'s fixed wait is a demo-grade heuristic). // // Use case: register-shadow actors that need to come up mid-simulation with // the exact state the DUT holds (e.g. after a checkpoint restore). // --------------------------------------------------------------------------- virtual class EventSourcedActor extends Actor; bit replaying = 0; function new(string name = "EventSourcedActor"); super.new(name); endfunction pure virtual function void apply_event(MsgBase msg); virtual task act(MsgBase msg); apply_event(msg); endtask task replay_from(string path, ReplayActor concrete_replay); replaying = 1; // Caller is responsible for wiring concrete_replay -> this with // `WIRE(concrete_replay, EventType, this) for each event type the // recorded log contains, BEFORE calling replay_from(). The // EventSourcedActor base class cannot do this wiring because the // payload types are domain-specific and only known to the subclass. concrete_replay.start(); // wait briefly for replay to drain — production code wires a "done" msg #100ns; replaying = 0; endtask endclass endpackage
// actor_pkg.sv // // Core actor framework. Every actor is an FSM with a typed input alphabet // and typed output emissions. Topology is wired declaratively from outside // the actor (a parent --- testbench, env, supervisor --- calls `WIRE), not // imperatively from inside the actor body. This is the framework's defining // property: producer code never references its consumers, consumer code // never references its producers; both are wired together externally by // message type, exactly like hardware modules connected by typed wires at // the parent level. // // Subscription is type-indexed. A consumer that wires for Msg#(Transaction) // receives only those messages from the wired producer; messages of other // types from the same producer go to their own subscribers. No fan-out to // uninterested consumers, no runtime filter at the receiver. There is no // wildcard / subscribe-to-everything primitive in the base framework --- // a tracer that wants to observe every message type from a producer wires // for each type explicitly, which keeps the topology fully visible in the // wiring code (no hidden edges). // // MsgBase carries causal lineage (trace_id, parent_span, sender_id, // timestamp_ns) so OpenTelemetry-style cross-actor tracing works without // retrofit. package actor_pkg; // --------------------------------------------------------------------------- // Globally unique identifier sources // --------------------------------------------------------------------------- int unsigned _actor_next_id = 1; longint unsigned _trace_next_id = 1; // --------------------------------------------------------------------------- // MsgBase: every envelope carries enough metadata to reconstruct causality // across an arbitrary distributed actor topology. // --------------------------------------------------------------------------- virtual class MsgBase; longint unsigned trace_id = 0; // root cause identifier (propagated) longint unsigned parent_span = 0; // immediate causal ancestor's timestamp longint unsigned timestamp_ns = 0; // emission time (set by stamp()) int unsigned sender_id = 0; // originating actor's id pure virtual function string getTypeName(); // Called by publish() to lock in identity at emission function void stamp(int unsigned from_actor); sender_id = from_actor; timestamp_ns = $time; if (trace_id == 0) trace_id = _trace_next_id++; endfunction endclass // --------------------------------------------------------------------------- // Typed message wrapper. getTypeName() returns $typename(T), which is the // same string the `WIRE macro captures at the call site as the subscriber // key, so producer.publish() looks up subscribers under the identical key. // --------------------------------------------------------------------------- class Msg #(type T = int) extends MsgBase; T payload; function new(T p); payload = p; endfunction virtual function string getTypeName(); return $typename(T); endfunction static function T unwrap(MsgBase base); Msg#(T) typed_msg; if (base == null) begin $fatal(1, "Actor unwrap error: null msg passed (expected %s)", $typename(T)); end if (!$cast(typed_msg, base)) begin $fatal(1, "Actor type-cast error: expected %s, got %s", $typename(T), base.getTypeName()); end return typed_msg.payload; endfunction endclass // --------------------------------------------------------------------------- // Universal Actor base class. // // subscribers_by_type[typename][i] -- consumers that asked for that type. // publish() looks up by msg.getTypeName() and fans out only to those. // No wildcard / subscribe-to-everything queue exists; topology is fully // explicit in the wiring code. // // Override act() (single-message handler) or run() (full custom loop) in // subclasses. // --------------------------------------------------------------------------- virtual class Actor; mailbox #(MsgBase) mbox; Actor subscribers_by_type [string][$]; // type-indexed routing string name; int unsigned id; int mbox_capacity = 0; // 0 = unbounded bit is_alive = 1; process p_run; int unsigned run_gen = 0; // start/stop generation; stale // run-forks self-cancel against it function new(string name = "Actor", int capacity = 0); this.name = name; this.id = _actor_next_id++; this.mbox_capacity = capacity; // Use if/else (not a ternary) because `new` is not accepted as a // ternary expression value in some simulators. if (capacity > 0) this.mbox = new(capacity); else this.mbox = new(); endfunction // Typed subscriber registration. Invoked by the `WIRE macro, which // captures $typename(T) at the call site. Direct invocation is fine too, // but `WIRE keeps the wiring statement uniform across the codebase. virtual function void add_subscriber(string type_name, Actor sub); subscribers_by_type[type_name].push_back(sub); endfunction // Fan-out publish. Dispatches only to subscribers registered for the // message's specific type. Backed-up subscribers drop via try_put // rather than stalling the producer; use try_publish() for // backpressure-aware dispatch. virtual function void publish(MsgBase msg); string tn; Actor q[$]; msg.stamp(this.id); tn = msg.getTypeName(); if (!subscribers_by_type.exists(tn)) return; q = subscribers_by_type[tn]; foreach (q[i]) begin if (q[i].is_alive) void'(q[i].mbox.try_put(msg)); end endfunction // Returns 1 only when every wired consumer accepted the message --- // gives the caller a backpressure signal it can act on. virtual function bit try_publish(MsgBase msg); bit all_ok = 1; string tn; Actor q[$]; msg.stamp(this.id); tn = msg.getTypeName(); if (!subscribers_by_type.exists(tn)) return all_ok; q = subscribers_by_type[tn]; foreach (q[i]) begin if (q[i].is_alive) all_ok = all_ok & (q[i].mbox.try_put(msg) != 0); end return all_ok; endfunction virtual task act(MsgBase msg); // Override in subclass; default is a no-op sink. endtask virtual function void on_terminate(); // Override for cleanup --- called by stop(). endfunction virtual task run(); MsgBase msg; forever begin mbox.get(msg); act(msg); end endtask virtual function void start(); is_alive = 1; run_gen++; // The forked block does not execute until the caller suspends (LRM // 9.3.2), so a stop() in the same time slice cannot kill it via p_run. // The generation token closes that race: my_gen is captured at fork // time, and a stale fork (out-lived by a newer start/stop) self-cancels // instead of becoming a second, unkillable run loop. fork automatic int unsigned my_gen = run_gen; begin if (my_gen == run_gen) begin p_run = process::self(); run(); end end join_none endfunction virtual function void stop(); is_alive = 0; run_gen++; // cancel any not-yet-started fork on_terminate(); // cleanup before kill: a self-stop // from inside act() must still run it if (p_run != null) p_run.kill(); endfunction endclass // --------------------------------------------------------------------------- // Wiring and publishing macros. // // `_tmp` is declared `automatic` so the macro works from both class-method // contexts (already automatic) and module-level initial/always blocks // (static by default, where a non-automatic `new(...)` initializer would // fire at time 0 and crash on null inputs). // --------------------------------------------------------------------------- // Declarative typed wiring: register CONSUMER as a subscriber to PRODUCER's // emissions of payload type PAYLOAD_T. Called externally by the parent // that owns the topology --- producer and consumer code stay agnostic of // each other's existence. Reads as "wire PAYLOAD_T from PRODUCER to // CONSUMER". This is the framework's only wiring primitive; a consumer // that wants to observe multiple types from one producer issues one // `WIRE per type. `define WIRE(PRODUCER, PAYLOAD_T, CONSUMER) \ PRODUCER.add_subscriber($typename(PAYLOAD_T), CONSUMER); // Emit a message. The producer's publish() looks up subscribers by the // message's type name and fans out only to those. `define PUBLISH(DATA) \ begin \ automatic Msg#(type(DATA)) _tmp = new(DATA); \ publish(_tmp); \ end // Direct mailbox put to a specific actor. Does NOT stamp metadata so it // can be called from any context (including non-Actor classes and // modules); bypasses the typed subscriber map entirely. For a traced // direct send, stamp() the message yourself before the try_put (as // AskActor::ask does); try_publish() routes via the subscriber map, not // to a specific actor. `define PUBLISH_TO(ACTOR_INST, DATA) \ begin \ automatic Msg#(type(DATA)) _tmp = new(DATA); \ void'(ACTOR_INST.mbox.try_put(_tmp)); \ end // Propagates the parent message's trace lineage to a new outbound envelope. `define PUBLISH_TRACED(DATA, PARENT_MSG) \ begin \ automatic Msg#(type(DATA)) _tmp = new(DATA); \ _tmp.trace_id = PARENT_MSG.trace_id; \ _tmp.parent_span = PARENT_MSG.timestamp_ns; \ publish(_tmp); \ end endpackage
// actor_pkg_all.sv // // Umbrella include — pulls in the core actor_pkg and every parallel extension // in one place. Use this when you want the full framework available; use the // individual packages when you want only the substrate or only specific // capabilities (the original 50-line core stands on its own). // // Typical user code: // `include "actor_pkg_all.sv" // import actor_pkg::*; // import actor_supervision_pkg::*; // import actor_routing_pkg::*; // ... // // The `include directives below assume all .sv files live in the same dir. // Adjust the search path or use simulator -y / +incdir+ as appropriate. `include "actor_pkg.sv" `include "actor_supervision_pkg.sv" `include "actor_routing_pkg.sv" `include "actor_patterns_pkg.sv" `include "actor_lifecycle_pkg.sv" `include "actor_observability_pkg.sv" `include "actor_verification_pkg.sv" `include "actor_persistence_pkg.sv" `include "actor_ral_pkg.sv" `include "actor_distributed_pkg.sv" `include "actor_test_pkg.sv"
// actor_ral_pkg.sv // // Register Abstraction Layer (RAL) as a framework primitive. // // This is intentionally NOT a port of UVM's uvm_reg / uvm_reg_field // hierarchy. UVM's RAL maintains four copies of every field's value // (Desired, Mirrored, Reset, Value) plus a predictor that updates the // mirrored copy from observed bus traffic. For CSR-heavy SoCs that is // gigabytes of testbench state shadowing state that already lives in // the RTL. Most of it is dead weight in the common case (read-this- // register-check-this-field). // // The actor-model RAL keeps a different bargain. Definitions are // immutable contract data: name, address, bit slice, access policy, // reset value. Current values are NOT shadowed -- they live in the // RTL (or in the IP-actor's slave-side backing store) and are read // via backdoor when a test needs them. The bus monitor's stream of // TlulMonPkt_s gets translated to symbolic RalEvent_s for downstream // coverage / trace / scoreboard subscribers; nothing reconstructs a // mirrored state because the mirrored state would just be a stale // copy of the RTL. // // Memories follow the same rule with even less state: a memory is // just a (base_addr, size, backdoor_root) triple. Reads and writes // pass through to the actual storage. // // API surface: // define_reg / define_field / define_mem -- populate the contract // addr_of(name) : symbolic name -> physical address // name_at(addr) : physical addr -> symbolic name (reverse) // field_info(name) : symbolic name -> (lsb, width, access, reset) // read_field(name) : backdoor pass-through to RTL field // write_field(name, v) : backdoor pass-through to RTL field // read_mem(name, off) : backdoor pass-through to memory cell // write_mem(name, off, v): backdoor pass-through to memory cell // // Auto-generation: appC_earlgrey/tools/reggen_actor.py emits the // define_* body from an OpenTitan-style Hjson register description, // the same way OpenTitan's `reggen --uvm` emits UVM RAL classes. package actor_ral_pkg; import actor_pkg::*; typedef enum logic [3:0] { RAL_RW = 0, // read/write RAL_RO = 1, // read-only (writes ignored) RAL_W1C = 2, // write 1 to clear (e.g. interrupt status) RAL_W0C = 3, // write 0 to clear RAL_WO = 4, // write-only (reads return 0) RAL_RC = 5 // read-clear (read returns then clears) } ral_access_e; typedef struct { string name; logic [31:0] addr; logic [31:0] reset_value; ral_access_e access; } ral_reg_def_t; typedef struct { string reg_name; // parent register name string field_name; // field name (unique within reg) int lsb; int width; ral_access_e access; logic [31:0] reset_value; } ral_field_def_t; typedef struct { string name; logic [31:0] base_addr; longint unsigned size_bytes; string backdoor_root; // SV hierarchical path; empty if none } ral_mem_def_t; // Symbolic event published when the RAL's bus subscriber translates // an observed TlulMonPkt_s into a known register access. typedef struct { string reg_name; logic [31:0] addr; logic [31:0] value; // wdata for writes, rdata for reads bit is_write; longint unsigned timestamp_ns; } RalEvent_s; // Read/write handle abstraction for backdoor access. In real silicon // testbenches this is a virtual interface to the DUT's hierarchical // path. For the example DUTs we plug in a behavioral implementation // that talks to the IP-actor's slave-side backing store; for real RTL // you'd plug in a wrapper that uses SystemVerilog hierarchical refs. // Subclass and override to do the actual access. virtual class RalBackdoor; pure virtual function logic [31:0] read_addr(logic [31:0] addr); pure virtual function void write_addr(logic [31:0] addr, logic [31:0] value); endclass class RalActor extends Actor; // Forward maps ral_reg_def_t regs [string]; ral_field_def_t fields [string]; // key: "<reg>.<field>" ral_mem_def_t mems [string]; // Reverse map for fast bus-traffic decode string name_by_addr [logic [31:0]]; // Backdoor handle. Optional: tests that don't need backdoor // access can leave it null. Set via attach_backdoor(). RalBackdoor backdoor; // Block base address. Register-table addresses are stored as // block-relative offsets; addr_of() returns absolute addresses by // adding this base. Default 0 treats the table as absolute. logic [31:0] addr_offset; function new(string name = "RalActor"); super.new(name); addr_offset = 32'h0; endfunction function void attach_backdoor(RalBackdoor bd); backdoor = bd; endfunction function void set_addr_offset(logic [31:0] base_addr); addr_offset = base_addr; endfunction // ---- Definition population ---- function void define_reg(string name_, logic [31:0] addr, ral_access_e access, logic [31:0] reset_value = 32'h0); ral_reg_def_t r; r.name = name_; r.addr = addr; r.reset_value = reset_value; r.access = access; regs[name_] = r; name_by_addr[addr] = name_; endfunction function void define_field(string reg_name, string field_name, int lsb, int width, ral_access_e access, logic [31:0] reset_value = 32'h0); ral_field_def_t f; f.reg_name = reg_name; f.field_name = field_name; f.lsb = lsb; f.width = width; f.access = access; f.reset_value = reset_value; fields[{reg_name, ".", field_name}] = f; endfunction function void define_mem(string name_, logic [31:0] base_addr, longint unsigned size_bytes, string backdoor_root = ""); ral_mem_def_t m; m.name = name_; m.base_addr = base_addr; m.size_bytes = size_bytes; m.backdoor_root = backdoor_root; mems[name_] = m; endfunction // ---- Symbolic queries ---- function logic [31:0] addr_of(string reg_name); if (!regs.exists(reg_name)) $fatal(1, "RalActor::addr_of: unknown register '%s'", reg_name); return regs[reg_name].addr + addr_offset; endfunction function string name_at(logic [31:0] absolute_addr); logic [31:0] reg_offset = absolute_addr - addr_offset; return name_by_addr.exists(reg_offset) ? name_by_addr[reg_offset] : ""; endfunction function ral_field_def_t field_info(string field_qname); if (!fields.exists(field_qname)) $fatal(1, "RalActor::field_info: unknown field '%s'", field_qname); return fields[field_qname]; endfunction function logic [31:0] reset_value_of(string reg_name); return regs.exists(reg_name) ? regs[reg_name].reset_value : 32'h0; endfunction // ---- Backdoor access (no shadow state, no predictor) ---- function logic [31:0] read_field(string field_qname); ral_field_def_t f; logic [31:0] word; if (backdoor == null) $fatal(1, "RalActor::read_field: no backdoor attached"); f = field_info(field_qname); // addr_of(), not the raw table entry: register addresses are stored // block-relative, and the backdoor speaks absolute addresses once // set_addr_offset() has been applied (exactly as read_reg does). word = backdoor.read_addr(addr_of(f.reg_name)); return (word >> f.lsb) & ((1 << f.width) - 1); endfunction function void write_field(string field_qname, logic [31:0] value); ral_field_def_t f; logic [31:0] word, mask; if (backdoor == null) $fatal(1, "RalActor::write_field: no backdoor attached"); f = field_info(field_qname); mask = ((1 << f.width) - 1) << f.lsb; word = backdoor.read_addr(addr_of(f.reg_name)); word = (word & ~mask) | ((value << f.lsb) & mask); backdoor.write_addr(addr_of(f.reg_name), word); endfunction function logic [31:0] read_reg(string reg_name); if (backdoor == null) $fatal(1, "RalActor::read_reg: no backdoor attached"); return backdoor.read_addr(addr_of(reg_name)); endfunction function void write_reg(string reg_name, logic [31:0] value); if (backdoor == null) $fatal(1, "RalActor::write_reg: no backdoor attached"); backdoor.write_addr(addr_of(reg_name), value); endfunction // Memory base addresses follow the register convention: stored // block-relative, made absolute with addr_offset here. function logic [31:0] read_mem(string mem_name, logic [31:0] offset); if (!mems.exists(mem_name) || backdoor == null) $fatal(1, "RalActor::read_mem: unknown mem or no backdoor"); return backdoor.read_addr(addr_offset + mems[mem_name].base_addr + offset); endfunction function void write_mem(string mem_name, logic [31:0] offset, logic [31:0] value); if (!mems.exists(mem_name) || backdoor == null) $fatal(1, "RalActor::write_mem: unknown mem or no backdoor"); backdoor.write_addr(addr_offset + mems[mem_name].base_addr + offset, value); endfunction // ---- Bus-side observation ---- // The RAL is wired to its IP's bus monitor via `WIRE(monitor, BusTxn, // ral) in the env, and re-publishes each observed transaction as a // symbolic RalEvent_s with the resolved register name. Subscribers // that care about specific registers (a coverage actor, a trace // recorder, a scoreboard hook) wire to the RalActor instead of // decoding the raw bus stream themselves. // // The raw struct passed into act() carries an `addr`, an `is_write` // flag, and a value. Any monitor-side struct that has these fields // can be plugged in; the canonical one is tlul_pkg::TlulMonPkt_s, // but the framework deliberately doesn't import it here so this // package stays bus-protocol-agnostic. See // earlgrey examples for the TL-UL adapter pattern. virtual task act(MsgBase msg); // Subclass per protocol to translate the raw bus packet into a // RalEvent_s. Default: no-op. endtask endclass endpackage
// actor_routing_pkg.sv // // Akka-style routers. A router is itself an Actor — messages it receives are // forwarded to one or more of its routees according to a strategy. // // Routers compose: a RoundRobinRouter can have BroadcastRouter children, etc. // The forwarding routers pass the original envelope through unmodified, so // its trace lineage survives; ScatterGatherRouter re-publishes a combined // message and copies the lineage from the gathered replies onto it. // // Strategies provided: // RoundRobinRouter — cycle through routees, one message each // BroadcastRouter — every routee receives every message (== publish) // RandomRouter — uniform random pick // ConsistentHashRouter — key extracted from message picks the routee // LeastBusyRouter — routee with smallest mailbox depth wins // ScatterGatherRouter — broadcasts then collects N replies into one // // All routers expose `add_routee(Actor)` for dynamic membership changes. package actor_routing_pkg; import actor_pkg::*; // --------------------------------------------------------------------------- // Base router — common bookkeeping // --------------------------------------------------------------------------- virtual class Router extends Actor; Actor routees[$]; function new(string name = "Router", int capacity = 0); super.new(name, capacity); endfunction virtual function void add_routee(Actor r); routees.push_back(r); endfunction // A routee stopped by a Supervisor (or plain stop()) keeps a valid // mailbox that nobody drains; delivering there is silent message loss. // Routers therefore select among live routees only, exactly as // publish() skips dead subscribers. protected function void get_live(ref Actor live[$]); live.delete(); foreach (routees[i]) if (routees[i].is_alive) live.push_back(routees[i]); endfunction virtual function void remove_routee(Actor r); foreach (routees[i]) if (routees[i].id == r.id) begin routees.delete(i); break; end endfunction endclass // --------------------------------------------------------------------------- // Round-robin: classic load balancer // --------------------------------------------------------------------------- class RoundRobinRouter extends Router; int next_idx = 0; function new(string name = "RoundRobinRouter", int capacity = 0); super.new(name, capacity); endfunction virtual task act(MsgBase msg); Actor live[$]; get_live(live); if (live.size() == 0) return; if (next_idx >= live.size()) next_idx = 0; // membership may have shrunk void'(live[next_idx].mbox.try_put(msg)); next_idx = (next_idx + 1) % live.size(); endtask endclass // --------------------------------------------------------------------------- // Broadcast: every routee receives every message // --------------------------------------------------------------------------- class BroadcastRouter extends Router; function new(string name = "BroadcastRouter", int capacity = 0); super.new(name, capacity); endfunction virtual task act(MsgBase msg); foreach (routees[i]) if (routees[i].is_alive) void'(routees[i].mbox.try_put(msg)); endtask endclass // --------------------------------------------------------------------------- // Random: uniform pick // --------------------------------------------------------------------------- class RandomRouter extends Router; function new(string name = "RandomRouter", int capacity = 0); super.new(name, capacity); endfunction virtual task act(MsgBase msg); Actor live[$]; int pick; get_live(live); if (live.size() == 0) return; pick = $urandom_range(live.size() - 1, 0); void'(live[pick].mbox.try_put(msg)); endtask endclass // --------------------------------------------------------------------------- // Least-busy: routee with smallest mailbox depth wins // Useful when routees process messages at different rates. // --------------------------------------------------------------------------- class LeastBusyRouter extends Router; function new(string name = "LeastBusyRouter", int capacity = 0); super.new(name, capacity); endfunction virtual task act(MsgBase msg); Actor live[$]; int best_idx = 0; int best_depth; get_live(live); if (live.size() == 0) return; best_depth = live[0].mbox.num(); for (int i = 1; i < live.size(); i++) begin int d = live[i].mbox.num(); if (d < best_depth) begin best_depth = d; best_idx = i; end end void'(live[best_idx].mbox.try_put(msg)); endtask endclass // --------------------------------------------------------------------------- // Consistent hash router — same key always hits same routee. // Subclass and override `extract_key()` to pick the right field of the // message (e.g. transaction id, address, master id). // --------------------------------------------------------------------------- virtual class ConsistentHashRouter extends Router; function new(string name = "ConsistentHashRouter", int capacity = 0); super.new(name, capacity); endfunction pure virtual function int unsigned extract_key(MsgBase msg); virtual task act(MsgBase msg); Actor live[$]; int unsigned k; int idx; get_live(live); if (live.size() == 0) return; k = extract_key(msg); idx = k % live.size(); void'(live[idx].mbox.try_put(msg)); endtask endclass // --------------------------------------------------------------------------- // Scatter-gather: broadcast a request to every live routee, collect the // replies, publish one combined message. Subclass and override `combine()` // to choose the aggregation policy. // // One scatter is in flight at a time: a message received while idle is the // request (broadcast to the routees), and every message received while // gathering counts as a reply --- wire the routees' reply type back to this // router. `expected` fixes the reply count; 0 means one reply per live // routee at scatter time. The combined message inherits the request's // trace lineage, so causality survives the gather point. // --------------------------------------------------------------------------- virtual class ScatterGatherRouter extends Router; int expected_replies; // 0 = one per live routee int replies_needed = 0; // resolved at scatter time bit in_flight = 0; longint unsigned scatter_trace = 0; MsgBase pending_replies[$]; function new(string name = "ScatterGatherRouter", int expected = 0, int capacity = 0); super.new(name, capacity); expected_replies = expected; endfunction pure virtual function MsgBase combine(MsgBase replies[$]); virtual task act(MsgBase msg); if (!in_flight) begin // Scatter: broadcast the request to every live routee. Actor live[$]; get_live(live); if (live.size() == 0) return; if (msg.trace_id == 0) msg.stamp(this.id); scatter_trace = msg.trace_id; foreach (live[i]) void'(live[i].mbox.try_put(msg)); replies_needed = (expected_replies > 0) ? expected_replies : live.size(); in_flight = 1; end else begin // Gather: combine once the expected replies have arrived. pending_replies.push_back(msg); if (pending_replies.size() >= replies_needed) begin MsgBase combined = combine(pending_replies); combined.trace_id = scatter_trace; // lineage survives the gather combined.parent_span = pending_replies[0].timestamp_ns; publish(combined); pending_replies.delete(); in_flight = 0; end end endtask endclass endpackage
// actor_supervision_pkg.sv // // Erlang/OTP-style supervision trees for fault-tolerant verification topologies. // A Supervisor wraps a set of children and applies a restart strategy when one // fails. This formalizes the ResetSupervisor pattern shown in Chapter 6 // (actor_supervision_pkg: Fault Tolerance and Lifecycles) and gives it the // same vocabulary the Erlang/OTP and Akka communities use. // // Strategies: // ONE_FOR_ONE — restart only the failed child // ONE_FOR_ALL — restart every child if one fails (for tightly-coupled VIP) // REST_FOR_ONE — restart failed child + all started after it (ordered chains) // // Restart budget (max_restarts / period) prevents per-child infinite restart // loops --- similar in spirit to Erlang's `intensity`/`period` shutdown, but // charged per child, where Erlang's counter is supervisor-wide. package actor_supervision_pkg; import actor_pkg::*; typedef enum { ONE_FOR_ONE, ONE_FOR_ALL, REST_FOR_ONE } SupervisionStrategy_e; typedef enum { RESTART, // child crashed but should restart STOP, // child crashed and should stay dead RESUME, // ignore failure, leave child running ESCALATE // promote failure to my own supervisor } RestartDirective_e; // --------------------------------------------------------------------------- // Failure / death messages — first-class structs so any actor can observe // --------------------------------------------------------------------------- typedef struct { int unsigned child_id; string child_name; string reason; longint unsigned timestamp; } ChildFailureMsg_s; typedef struct { int unsigned actor_id; string actor_name; longint unsigned timestamp; } DeathMsg_s; // --------------------------------------------------------------------------- // Supervisor — wraps children with strategy-driven restart. // // Restart is stop()+start() on the same object: the supervisor drains the // child's mailbox so it comes back to an empty queue (Erlang gives a fresh // process the same guarantee), but member fields persist --- resetting them // is the child's on_terminate() contract. // --------------------------------------------------------------------------- class Supervisor extends Actor; Actor children[$]; SupervisionStrategy_e strategy = ONE_FOR_ONE; int max_restarts = 10; longint unsigned restart_window_ns = 64'd60_000_000_000; // 60s int restart_count[int]; // child_id -> count longint unsigned window_start[int]; // child_id -> ts function new(string name = "Supervisor", SupervisionStrategy_e strat = ONE_FOR_ONE, int capacity = 0); super.new(name, capacity); strategy = strat; endfunction virtual function void supervise(Actor child); children.push_back(child); endfunction virtual function void start_all(); foreach (children[i]) children[i].start(); this.start(); endfunction // Override to choose per-failure directive (default: always restart) virtual function RestartDirective_e on_child_failure(int unsigned child_id, string reason); return RESTART; endfunction virtual task act(MsgBase msg); if (msg.getTypeName() == $typename(ChildFailureMsg_s)) begin ChildFailureMsg_s f; RestartDirective_e d; f = Msg#(ChildFailureMsg_s)::unwrap(msg); d = on_child_failure(f.child_id, f.reason); case (d) RESTART: do_restart(f.child_id); STOP: do_stop(f.child_id); RESUME: ; // no-op ESCALATE: begin // forward the failure up the supervisor chain, preserving lineage Msg#(ChildFailureMsg_s) m = new(f); m.trace_id = msg.trace_id; m.parent_span = msg.timestamp_ns; publish(m); end endcase end endtask function void do_restart(int unsigned child_id); Actor c = find_child(child_id); if (c == null) return; if (!enforce_budget(child_id)) begin $fatal(1, "Supervisor %s: child %s exceeded restart budget (%0d in %0t ns)", name, c.name, max_restarts, restart_window_ns); end case (strategy) ONE_FOR_ONE: begin c.stop(); drain_mbox(c); c.start(); end ONE_FOR_ALL: begin foreach (children[i]) children[i].stop(); foreach (children[i]) drain_mbox(children[i]); foreach (children[i]) children[i].start(); end REST_FOR_ONE: begin int idx = find_index(child_id); if (idx < 0) return; for (int i = idx; i < children.size(); i++) children[i].stop(); for (int i = idx; i < children.size(); i++) drain_mbox(children[i]); for (int i = idx; i < children.size(); i++) children[i].start(); end endcase endfunction // A restarted child must not replay the queue that led to the failure // (possibly the crashing message itself, re-crashing until the budget // $fatal fires). function void drain_mbox(Actor c); MsgBase m; while (c.mbox.try_get(m) != 0) ; endfunction function void do_stop(int unsigned child_id); Actor c = find_child(child_id); if (c != null) c.stop(); endfunction function bit enforce_budget(int unsigned child_id); longint unsigned now = $time; if (!window_start.exists(child_id) || (now - window_start[child_id]) > restart_window_ns) begin window_start[child_id] = now; restart_count[child_id] = 0; end restart_count[child_id]++; return (restart_count[child_id] <= max_restarts); endfunction function Actor find_child(int unsigned child_id); foreach (children[i]) if (children[i].id == child_id) return children[i]; return null; endfunction function int find_index(int unsigned child_id); foreach (children[i]) if (children[i].id == child_id) return i; return -1; endfunction endclass // --------------------------------------------------------------------------- // DeathWatcher — one-way termination notification (Erlang's `monitor`) // --------------------------------------------------------------------------- class DeathWatcher; Actor watchers_by_target[int][$]; function void monitor(Actor watcher, Actor target); watchers_by_target[target.id].push_back(watcher); endfunction function void notify_death(int unsigned target_id, string target_name); DeathMsg_s d; d.actor_id = target_id; d.actor_name = target_name; d.timestamp = $time; if (watchers_by_target.exists(target_id)) begin Actor watchers[$] = watchers_by_target[target_id]; foreach (watchers[i]) begin Msg#(DeathMsg_s) m = new(d); m.stamp(0); void'(watchers[i].mbox.try_put(m)); end end endfunction endclass // --------------------------------------------------------------------------- // LinkRegistry — bidirectional fate sharing (Erlang's `link`) // If either actor in a link dies, the surviving peer receives a DeathMsg_s // naming the dead actor, directly in its own mailbox. // --------------------------------------------------------------------------- class LinkRegistry; Actor peers_of[int][$]; // actor id -> linked peer handles function void link(Actor a, Actor b); peers_of[a.id].push_back(b); peers_of[b.id].push_back(a); endfunction // Deliver a DeathMsg_s naming `dead` to every still-living linked peer; // optionally fan out through `dw` so the dead actor's monitors hear too. function void on_death(Actor dead, DeathWatcher dw = null); DeathMsg_s d; Msg#(DeathMsg_s) m; d.actor_id = dead.id; d.actor_name = dead.name; d.timestamp = $time; if (peers_of.exists(dead.id)) begin Actor peers[$] = peers_of[dead.id]; foreach (peers[i]) begin if (!peers[i].is_alive) continue; m = new(d); m.stamp(dead.id); void'(peers[i].mbox.try_put(m)); end end if (dw != null) dw.notify_death(dead.id, dead.name); endfunction endclass endpackage
// actor_test_pkg.sv // // Test-kit primitives for unit-testing actors in isolation. The Actor model's // strict mailbox-only interaction means each actor is genuinely unit-testable // without standing up the full topology — which is the per-actor CI/CD claim // of Chapter 6's unit-testing discussion. // // ProbeActor — captures every received message into a queue // FakeActor — programmable response: rule-based reply to incoming msg // ExpectKit — assertion helpers: expect_message, expect_no_message, // expect_count // // Typical test layout (type_name keys are $typename() strings, not bare // typedef names — $typename of a package struct is the long structural form): // bit ok; // ProbeActor probe = new(); // `WIRE(dut_actor, MyResponse_s, probe) // one `WIRE per type probed // dut_actor.start(); probe.start(); // `PUBLISH_TO(dut_actor, my_request_struct); // ExpectKit::expect_message(probe, $typename(MyResponse_s), 100, ok); package actor_test_pkg; import actor_pkg::*; // --------------------------------------------------------------------------- // ProbeActor — passive sink that captures everything for later assertions // --------------------------------------------------------------------------- class ProbeActor extends Actor; MsgBase received[$]; function new(string name = "Probe"); super.new(name); endfunction virtual task act(MsgBase msg); received.push_back(msg); endtask function int count_of_type(string type_name); int n = 0; foreach (received[i]) if (received[i].getTypeName() == type_name) n++; return n; endfunction function MsgBase first_of_type(string type_name); foreach (received[i]) if (received[i].getTypeName() == type_name) return received[i]; return null; endfunction function void clear(); received.delete(); endfunction endclass // --------------------------------------------------------------------------- // FakeActor — programmable response actor for testing the actors that send // requests. Set up rules: "when you see message of type X, reply with Y". // --------------------------------------------------------------------------- typedef struct { string match_type; MsgBase reply; } FakeRule_s; class FakeActor extends Actor; FakeRule_s rules[$]; function new(string name = "Fake"); super.new(name); endfunction function void on_receive_reply_with(string match_type, MsgBase reply); FakeRule_s r; r.match_type = match_type; r.reply = reply; rules.push_back(r); endfunction virtual task act(MsgBase msg); foreach (rules[i]) begin if (rules[i].match_type == msg.getTypeName()) begin // NOTE: every matching request republishes the SAME reply object; // retaining subscribers hold N aliases whose lineage reflects only // the last request. Use one rule per expected request (or subclass // with a reply factory) when per-request lineage matters. rules[i].reply.trace_id = msg.trace_id; rules[i].reply.parent_span = msg.timestamp_ns; publish(rules[i].reply); break; end end endtask endclass // --------------------------------------------------------------------------- // ExpectKit — assertion helpers. Designed to be called from test code, not // production. Each helper reports success via its `ok` output (or return // value for expect_count), and $display // a diagnostic on failure (test runners can hook this). // --------------------------------------------------------------------------- class ExpectKit; // Wait up to timeout for the probe to receive a message of `type_name`. static task expect_message(ProbeActor probe, string type_name, longint unsigned timeout_ns, output bit ok); longint unsigned t0 = $time; ok = 0; while (($time - t0) < timeout_ns) begin if (probe.count_of_type(type_name) > 0) begin ok = 1; return; end #1ns; end $display("[Expect FAIL] no message of type %s within %0t ns", type_name, timeout_ns); endtask static task expect_no_message(ProbeActor probe, string type_name, longint unsigned within_ns, output bit ok); int starting = probe.count_of_type(type_name); #(within_ns * 1ns); ok = (probe.count_of_type(type_name) == starting); if (!ok) $display("[Expect FAIL] received unexpected %s within %0t ns", type_name, within_ns); endtask static function bit expect_count(ProbeActor probe, string type_name, int expected); int actual = probe.count_of_type(type_name); if (actual != expected) begin $display("[Expect FAIL] count(%s): expected %0d, got %0d", type_name, expected, actual); return 0; end return 1; endfunction endclass endpackage
// actor_verification_pkg.sv // // Verification-domain base actors. These formalize the patterns that Chapter 6 // shows ad-hoc (CoverageActor, ConstraintActor) into reusable scaffolding so // users only override the bits that change per-DUT. // // ConstraintActor — randomize-and-publish loop with hot-swap policies // CoverageActor — covergroup helpers + sample-on-receive // ScoreboardActor — paired-message comparator (request vs response) // SpecActor — golden model that consumes the same input as DUT // DiffActor — compares two output streams (DUT vs Spec) // // All inherit from Actor and integrate with supervision, observability, and // distribution unmodified. package actor_verification_pkg; import actor_pkg::*; // --------------------------------------------------------------------------- // ConstraintActor — randomize a typed payload and publish in a loop. // Override `randomize_and_publish()` and pick a `rate_ns` for the cadence. // // Hot-swap: stop() the old instance and `WIRE a replacement from the // parent that owns the topology to change the entire // stimulus distribution without touching downstream actors. This is the // architectural improvement over UVM's factory-override + sequence-override // dance. // --------------------------------------------------------------------------- virtual class ConstraintActor extends Actor; longint unsigned rate_ns = 100; bit running = 1; function new(string name = "ConstraintActor", longint unsigned rate = 100); super.new(name); rate_ns = rate; endfunction pure virtual task randomize_and_publish(); virtual task run(); realtime delay_t; forever begin if (!running) begin #1ns; continue; end randomize_and_publish(); if (rate_ns > 0) begin delay_t = rate_ns * 1ns; #delay_t; end end endtask virtual function void pause(); running = 0; endfunction virtual function void resume(); running = 1; endfunction endclass // --------------------------------------------------------------------------- // CoverageActor — convenient base for subscribers that maintain a covergroup // and sample on every received message of a specific type. Override // `sample_one()` to extract fields and call `cg.sample()`. // // The covergroup itself must live in the subclass because SV does not allow // covergroup declarations inside virtual classes referencing subclass state. // --------------------------------------------------------------------------- virtual class CoverageActor extends Actor; longint unsigned samples_taken = 0; function new(string name = "CoverageActor"); super.new(name); endfunction pure virtual function void sample_one(MsgBase msg); virtual task act(MsgBase msg); sample_one(msg); samples_taken++; endtask endclass // --------------------------------------------------------------------------- // ScoreboardActor — pairs requests with responses by some key, computes a // pass/fail outcome. Override `is_request`, `is_response`, `key_of`, and // `compare`. The base class manages the pending table and emits Mismatch_s // events on any failure. // --------------------------------------------------------------------------- typedef struct { longint unsigned trace_id; string description; longint unsigned timestamp; } Mismatch_s; virtual class ScoreboardActor extends Actor; int pending_count = 0; int match_count = 0; int mismatch_count = 0; MsgBase pending[longint unsigned][$]; // key -> request FIFO // (duplicate outstanding // keys pair in order) function new(string name = "ScoreboardActor"); super.new(name); endfunction pure virtual function bit is_request (MsgBase msg); pure virtual function bit is_response(MsgBase msg); pure virtual function longint unsigned key_of (MsgBase msg); pure virtual function bit compare (MsgBase req, MsgBase rsp, output string reason); virtual task act(MsgBase msg); if (is_request(msg)) begin pending[key_of(msg)].push_back(msg); pending_count++; end else if (is_response(msg)) begin longint unsigned k = key_of(msg); if (!pending.exists(k) || pending[k].size() == 0) begin Mismatch_s m; m.trace_id = msg.trace_id; m.description = $sformatf("orphan response key=%0d", k); m.timestamp = $time; mismatch_count++; `PUBLISH(m); end else begin string reason; MsgBase req = pending[k].pop_front(); if (compare(req, msg, reason)) begin match_count++; end else begin Mismatch_s m; m.trace_id = msg.trace_id; m.description = reason; m.timestamp = $time; mismatch_count++; `PUBLISH(m); end if (pending[k].size() == 0) pending.delete(k); pending_count--; end end endtask function void report(); $display("[Scoreboard %s] match=%0d mismatch=%0d pending=%0d", name, match_count, mismatch_count, pending_count); endfunction endclass // --------------------------------------------------------------------------- // SpecActor — runnable golden model. Consumes the same input messages as the // DUT, emits its own response messages on the same topic structure. A // DiffActor downstream compares them per-trace_id. // // Override `compute_response()` to encode the specification. // --------------------------------------------------------------------------- virtual class SpecActor extends Actor; function new(string name = "SpecActor"); super.new(name); endfunction pure virtual function MsgBase compute_response(MsgBase request); virtual task act(MsgBase msg); MsgBase rsp = compute_response(msg); if (rsp != null) begin rsp.trace_id = msg.trace_id; rsp.parent_span = msg.timestamp_ns; publish(rsp); end endtask endclass // --------------------------------------------------------------------------- // DiffActor — subscribes to both DUT and Spec output streams, pairs them by // trace_id, flags divergence. The simplest possible specification check. // --------------------------------------------------------------------------- virtual class DiffActor extends Actor; MsgBase from_dut[longint unsigned][$]; // trace_id -> response FIFO MsgBase from_spec[longint unsigned][$]; // (multi-response flows pair in order) int diff_count = 0; int match_count = 0; function new(string name = "DiffActor"); super.new(name); endfunction pure virtual function bit equal(MsgBase a, MsgBase b); // Override to identify message origin (dut vs spec) pure virtual function bit is_from_dut(MsgBase msg); virtual task act(MsgBase msg); if (is_from_dut(msg)) from_dut[msg.trace_id].push_back(msg); else from_spec[msg.trace_id].push_back(msg); try_pair(msg.trace_id); endtask function void try_pair(longint unsigned tid); if (from_dut.exists(tid) && from_spec.exists(tid) && from_dut[tid].size() > 0 && from_spec[tid].size() > 0) begin MsgBase d = from_dut[tid].pop_front(); MsgBase s = from_spec[tid].pop_front(); if (equal(d, s)) match_count++; else begin diff_count++; $display("[DiffActor %s] divergence trace=%0d", name, tid); end if (from_dut[tid].size() == 0) from_dut.delete(tid); if (from_spec[tid].size() == 0) from_spec.delete(tid); end endfunction function void report(); $display("[Diff %s] match=%0d divergence=%0d outstanding=%0d/%0d", name, match_count, diff_count, from_dut.size(), from_spec.size()); endfunction endclass endpackage