Skip to main content

hydro_lang/sim/
compiled.rs

1//! Interfaces for compiled Hydro simulators and concrete simulation instances.
2//!
3//! NOTE: This module runs inside bolero's `catch_unwind` scope, which silently
4//! swallows panics. Internal invariant checks should use `abort_assert!`
5//! rather than `panic!`/`assert!`.
6//!
7//! TODO(mingwei): Panics inside the tick DFIR (generated code in the dylib) are
8//! also caught by bolero's `catch_unwind`. Consider a mechanism to detect and
9//! propagate those as well.
10
11/// Like `assert!`, but calls `std::process::abort()` instead of `panic!()`.
12/// Use for internal invariants that must not be silently caught by bolero.
13macro_rules! abort_assert {
14    ($cond:expr, $($arg:tt)*) => {
15        if !$cond {
16            eprintln!("Simulator internal error: {}", format!($($arg)*));
17            std::process::abort();
18        }
19    };
20}
21
22use core::{fmt, panic};
23use std::cell::{Cell, RefCell};
24use std::collections::{HashMap, VecDeque};
25use std::fmt::Debug;
26use std::panic::RefUnwindSafe;
27use std::path::Path;
28use std::pin::{Pin, pin};
29use std::rc::Rc;
30use std::task::ready;
31
32use bytes::Bytes;
33use colored::Colorize;
34use dfir_rs::scheduled::context::DfirErased;
35use futures::{Stream, StreamExt};
36use libloading::Library;
37use serde::Serialize;
38use serde::de::DeserializeOwned;
39use tempfile::TempPath;
40use tokio::sync::mpsc::UnboundedSender;
41use tokio::sync::{Mutex, Notify};
42use tokio_stream::wrappers::UnboundedReceiverStream;
43
44use super::runtime::{Hooks, InlineHooks};
45use super::{SimClusterReceiver, SimClusterSender, SimReceiver, SimSender};
46use crate::compile::builder::ExternalPortId;
47use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
48use crate::location::dynamic::LocationId;
49use crate::sim::graph::{SimExternalPort, SimExternalPortRegistry};
50use crate::sim::runtime::SimHook;
51
52struct QuiescenceState {
53    /// Set to true when the scheduler reaches quiescence; reset to false when new input is sent.
54    quiescent: Cell<bool>,
55    /// Notified when the scheduler reaches quiescence (wakes receivers waiting for data).
56    quiescence_notify: Notify,
57    /// Notified when new input is sent, signaling the scheduler to resume.
58    resume_notify: Notify,
59}
60
61impl QuiescenceState {
62    /// Signal that new input has been sent, waking the scheduler if it was quiescent.
63    fn resume(&self) {
64        self.quiescent.set(false);
65        self.resume_notify.notify_waiters();
66    }
67
68    /// Whether the scheduler is currently quiescent (no more progress possible without input).
69    fn is_quiescent(&self) -> bool {
70        self.quiescent.get()
71    }
72
73    /// Returns a future that completes when the scheduler next reaches quiescence.
74    fn notified(&self) -> tokio::sync::futures::Notified<'_> {
75        self.quiescence_notify.notified()
76    }
77
78    /// Enter quiescence and wait for new input before continuing.
79    async fn wait_for_resume(&self) {
80        self.quiescent.set(true);
81        self.quiescence_notify.notify_waiters();
82        self.resume_notify.notified().await;
83        self.quiescent.set(false);
84    }
85}
86
87struct SimConnections {
88    input_senders: HashMap<SimExternalPort, Rc<UnboundedSender<Bytes>>>,
89    output_receivers: HashMap<SimExternalPort, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
90    cluster_input_senders: HashMap<SimExternalPort, Vec<Rc<UnboundedSender<Bytes>>>>,
91    cluster_output_receivers:
92        HashMap<SimExternalPort, Vec<Rc<Mutex<UnboundedReceiverStream<Bytes>>>>>,
93    external_registered: HashMap<ExternalPortId, SimExternalPort>,
94    quiescence: Rc<QuiescenceState>,
95}
96
97tokio::task_local! {
98    static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
99}
100
101/// A handle to a compiled Hydro simulation, which can be instantiated and run.
102pub struct CompiledSim {
103    pub(super) _path: TempPath,
104    pub(super) lib: Library,
105    pub(super) externals_port_registry: SimExternalPortRegistry,
106    pub(super) unit_test_fuzz_iterations: usize,
107}
108
109#[sealed::sealed]
110/// A trait implemented by closures that can instantiate a compiled simulation.
111///
112/// This is needed to ensure [`RefUnwindSafe`] so instances can be created during fuzzing.
113pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
114#[sealed::sealed]
115impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
116
117fn null_handler(_args: fmt::Arguments) {}
118
119fn println_handler(args: fmt::Arguments) {
120    println!("{}", args);
121}
122
123fn eprintln_handler(args: fmt::Arguments) {
124    eprintln!("{}", args);
125}
126
127/// Creates a simulation instance, returning:
128/// - A list of async DFIRs to run (all process / cluster logic outside a tick)
129/// - A list of tick DFIRs to run (where the &'static str is for the tick location id)
130/// - A mapping of hooks for non-deterministic decisions at tick-input boundaries
131/// - A mapping of inline hooks for non-deterministic decisions inside ticks
132type SimLoaded<'a> = libloading::Symbol<
133    'a,
134    unsafe extern "Rust" fn(
135        should_color: bool,
136        external_out: &mut HashMap<usize, UnboundedReceiverStream<Bytes>>,
137        external_in: &mut HashMap<usize, UnboundedSender<Bytes>>,
138        cluster_external_out: &mut HashMap<usize, Vec<UnboundedReceiverStream<Bytes>>>,
139        cluster_external_in: &mut HashMap<usize, Vec<UnboundedSender<Bytes>>>,
140        println_handler: fn(fmt::Arguments<'_>),
141        eprintln_handler: fn(fmt::Arguments<'_>),
142    ) -> (
143        Vec<(&'static str, Option<u32>, DfirErased)>,
144        Vec<(&'static str, Option<u32>, DfirErased)>,
145        Hooks<&'static str>,
146        InlineHooks<&'static str>,
147    ),
148>;
149
150impl CompiledSim {
151    /// Executes the given closure with a single instance of the compiled simulation.
152    pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
153        self.with_instantiator(|instantiator| thunk(instantiator()), true)
154    }
155
156    /// Executes the given closure with an [`Instantiator`], which can be called to create
157    /// independent instances of the simulation. This is useful for fuzzing, where we need to
158    /// re-execute the simulation several times with different decisions.
159    ///
160    /// The `always_log` parameter controls whether to log tick executions and stream releases. If
161    /// it is `true`, logging will always be enabled. If it is `false`, logging will only be
162    /// enabled if the `HYDRO_SIM_LOG` environment variable is set to `1`.
163    pub fn with_instantiator<T>(
164        &self,
165        thunk: impl FnOnce(&dyn Instantiator) -> T,
166        always_log: bool,
167    ) -> T {
168        let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
169        let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
170        thunk(
171            &(|| CompiledSimInstance {
172                func: func.clone(),
173                externals_port_registry: self.externals_port_registry.clone(),
174                dylib_result: None,
175                log,
176            }),
177        )
178    }
179
180    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
181    /// closure will be repeatedly executed with instances of the Hydro program where the
182    /// batching boundaries, order of messages, and retries are varied.
183    ///
184    /// During development, you should run the test that invokes this function with the `cargo sim`
185    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
186    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
187    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
188    /// be executed, and if no reproducer is found a small number of random executions will be
189    /// performed.
190    pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
191        let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
192            .elements()
193            .into_iter()
194            .find(|e| {
195                !e.fn_name.starts_with("hydro_lang::sim::compiled")
196                    && !e.fn_name.starts_with("hydro_lang::sim::flow")
197                    && !e.fn_name.starts_with("fuzz<")
198                    && !e.fn_name.starts_with("<hydro_lang::sim")
199            })
200            .unwrap();
201
202        let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
203        let repro_folder = caller_path.parent().unwrap().join("sim-failures");
204
205        let caller_fuzz_repro_path = repro_folder
206            .join(caller_fn.fn_name.replace("::", "__"))
207            .with_extension("bin");
208
209        if std::env::var("BOLERO_FUZZER").is_ok() {
210            let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
211            std::fs::create_dir_all(&corpus_dir).unwrap();
212            let libfuzzer_args = format!(
213                "{} {} -artifact_prefix={}/ -handle_abrt=0",
214                corpus_dir.to_str().unwrap(),
215                corpus_dir.to_str().unwrap(),
216                corpus_dir.to_str().unwrap(),
217            );
218
219            std::fs::create_dir_all(&repro_folder).unwrap();
220
221            if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
222                unsafe {
223                    std::env::set_var(
224                        "BOLERO_FAILURE_OUTPUT",
225                        caller_fuzz_repro_path.to_str().unwrap(),
226                    );
227                }
228            }
229
230            unsafe {
231                std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
232            }
233
234            self.with_instantiator(
235                |instantiator| {
236                    bolero::test(bolero::TargetLocation {
237                        package_name: "",
238                        manifest_dir: "",
239                        module_path: "",
240                        file: "",
241                        line: 0,
242                        item_path: "<unknown>::__bolero_item_path__",
243                        test_name: None,
244                    })
245                    .run_with_replay(move |is_replay| {
246                        let mut instance = instantiator();
247
248                        if instance.log {
249                            eprintln!(
250                                "{}",
251                                "\n==== New Simulation Instance ===="
252                                    .color(colored::Color::Cyan)
253                                    .bold()
254                            );
255                        }
256
257                        if is_replay {
258                            instance.log = true;
259                        }
260
261                        tokio::runtime::Builder::new_current_thread()
262                            .build()
263                            .unwrap()
264                            .block_on(async { instance.run(&mut thunk).await })
265                    })
266                },
267                false,
268            );
269        } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
270            self.fuzz_repro(existing_bytes, async |compiled| {
271                compiled.launch();
272                thunk().await
273            });
274        } else {
275            eprintln!(
276                "Running a fuzz test without `cargo sim` and no reproducer found at {}, using {} iterations with random inputs.",
277                caller_fuzz_repro_path.display(),
278                self.unit_test_fuzz_iterations,
279            );
280            self.with_instantiator(
281                |instantiator| {
282                    bolero::test(bolero::TargetLocation {
283                        package_name: "",
284                        manifest_dir: "",
285                        module_path: "",
286                        file: ".",
287                        line: 0,
288                        item_path: "<unknown>::__bolero_item_path__",
289                        test_name: None,
290                    })
291                    .with_iterations(self.unit_test_fuzz_iterations)
292                    .run(move || {
293                        let instance = instantiator();
294                        tokio::runtime::Builder::new_current_thread()
295                            .build()
296                            .unwrap()
297                            .block_on(async { instance.run(&mut thunk).await })
298                    })
299                },
300                false,
301            );
302        }
303    }
304
305    /// Executes the given closure with a single instance of the compiled simulation, using the
306    /// provided bytes as the source of fuzzing decisions. This can be used to manually reproduce a
307    /// failure found during fuzzing.
308    pub fn fuzz_repro<'a>(
309        &'a self,
310        bytes: Vec<u8>,
311        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
312    ) {
313        self.with_instance(|instance| {
314            bolero::bolero_engine::any::scope::with(
315                Box::new(bolero::bolero_engine::driver::object::Object(
316                    bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
317                )),
318                || {
319                    tokio::runtime::Builder::new_current_thread()
320                        .build()
321                        .unwrap()
322                        .block_on(async { instance.run_without_launching(thunk).await })
323                },
324            )
325        });
326    }
327
328    /// Exhaustively searches all possible executions of the simulation. The provided
329    /// closure will be repeatedly executed with instances of the Hydro program where the
330    /// batching boundaries, order of messages, and retries are varied.
331    ///
332    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
333    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
334    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
335    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
336    ///
337    /// Returns the number of distinct executions explored.
338    pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
339        if std::env::var("BOLERO_FUZZER").is_ok() {
340            eprintln!(
341                "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
342            );
343            std::process::abort();
344        }
345
346        let mut count = 0;
347        let count_mut = &mut count;
348
349        self.with_instantiator(
350            |instantiator| {
351                bolero::test(bolero::TargetLocation {
352                    package_name: "",
353                    manifest_dir: "",
354                    module_path: "",
355                    file: "",
356                    line: 0,
357                    item_path: "<unknown>::__bolero_item_path__",
358                    test_name: None,
359                })
360                .exhaustive()
361                .run_with_replay(move |is_replay| {
362                    *count_mut += 1;
363
364                    let mut instance = instantiator();
365                    if instance.log {
366                        eprintln!(
367                            "{}",
368                            "\n==== New Simulation Instance ===="
369                                .color(colored::Color::Cyan)
370                                .bold()
371                        );
372                    }
373
374                    if is_replay {
375                        instance.log = true;
376                    }
377
378                    tokio::runtime::Builder::new_current_thread()
379                        .build()
380                        .unwrap()
381                        .block_on(async { instance.run(&mut thunk).await })
382                })
383            },
384            false,
385        );
386
387        count
388    }
389}
390
391// This must be a tuple because it is referenced from generated code in `graph.rs`.
392type DylibResult = (
393    Vec<(&'static str, Option<u32>, DfirErased)>,
394    Vec<(&'static str, Option<u32>, DfirErased)>,
395    Hooks<&'static str>,
396    InlineHooks<&'static str>,
397);
398
399/// A single instance of a compiled Hydro simulation, which provides methods to interactively
400/// execute the simulation, feed inputs, and receive outputs.
401pub struct CompiledSimInstance<'a> {
402    func: SimLoaded<'a>,
403    externals_port_registry: SimExternalPortRegistry,
404    dylib_result: Option<DylibResult>,
405    log: bool,
406}
407
408impl<'a> CompiledSimInstance<'a> {
409    async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
410        self.run_without_launching(async |instance| {
411            instance.launch();
412            thunk().await;
413        })
414        .await;
415    }
416
417    async fn run_without_launching(
418        mut self,
419        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
420    ) {
421        let mut external_out: HashMap<usize, UnboundedReceiverStream<Bytes>> = HashMap::new();
422        let mut external_in: HashMap<usize, UnboundedSender<Bytes>> = HashMap::new();
423        let mut cluster_external_out: HashMap<usize, Vec<UnboundedReceiverStream<Bytes>>> =
424            HashMap::new();
425        let mut cluster_external_in: HashMap<usize, Vec<UnboundedSender<Bytes>>> = HashMap::new();
426
427        let dylib_result = unsafe {
428            (self.func)(
429                colored::control::SHOULD_COLORIZE.should_colorize(),
430                &mut external_out,
431                &mut external_in,
432                &mut cluster_external_out,
433                &mut cluster_external_in,
434                if self.log {
435                    println_handler
436                } else {
437                    null_handler
438                },
439                if self.log {
440                    eprintln_handler
441                } else {
442                    null_handler
443                },
444            )
445        };
446
447        let registered = &self.externals_port_registry.registered;
448
449        let quiescence = Rc::new(QuiescenceState {
450            quiescent: Cell::new(false),
451            quiescence_notify: Notify::new(),
452            resume_notify: Notify::new(),
453        });
454
455        let mut input_senders = HashMap::new();
456        let mut output_receivers = HashMap::new();
457        let mut cluster_input_senders = HashMap::new();
458        let mut cluster_output_receivers = HashMap::new();
459
460        #[expect(
461            clippy::disallowed_methods,
462            reason = "inserts into maps also unordered"
463        )]
464        for sim_port in registered.values() {
465            let usize_key = sim_port.into_inner();
466            if let Some(sender) = external_in.remove(&usize_key) {
467                input_senders.insert(*sim_port, Rc::new(sender));
468            }
469            if let Some(receiver) = external_out.remove(&usize_key) {
470                output_receivers.insert(*sim_port, Rc::new(Mutex::new(receiver)));
471            }
472            if let Some(senders) = cluster_external_in.remove(&usize_key) {
473                cluster_input_senders.insert(*sim_port, senders.into_iter().map(Rc::new).collect());
474            }
475            if let Some(receivers) = cluster_external_out.remove(&usize_key) {
476                cluster_output_receivers.insert(
477                    *sim_port,
478                    receivers
479                        .into_iter()
480                        .map(|r| Rc::new(Mutex::new(r)))
481                        .collect(),
482                );
483            }
484        }
485
486        self.dylib_result = Some(dylib_result);
487
488        let local_set = tokio::task::LocalSet::new();
489        local_set
490            .run_until(CURRENT_SIM_CONNECTIONS.scope(
491                RefCell::new(SimConnections {
492                    input_senders,
493                    output_receivers,
494                    cluster_input_senders,
495                    cluster_output_receivers,
496                    external_registered: self.externals_port_registry.registered.clone(),
497                    quiescence: quiescence.clone(),
498                }),
499                async move {
500                    thunk(self).await;
501                },
502            ))
503            .await;
504    }
505
506    /// Launches the simulation, which will asynchronously simulate the Hydro program. This should
507    /// be invoked but before receiving any messages.
508    fn launch(self) {
509        tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
510    }
511
512    /// Returns a future that schedules simulation with the given logger for reporting the
513    /// simulation trace.
514    pub fn schedule_with_logger<W: std::io::Write>(
515        self,
516        log_writer: W,
517    ) -> impl use<W> + Future<Output = ()> {
518        self.schedule_with_maybe_logger(Some(log_writer))
519    }
520
521    fn schedule_with_maybe_logger<W: std::io::Write>(
522        mut self,
523        log_override: Option<W>,
524    ) -> impl use<W> + Future<Output = ()> {
525        let (async_dfirs, tick_dfirs, hooks, inline_hooks) = self.dylib_result.take().unwrap();
526
527        let not_ready_observation = async_dfirs
528            .iter()
529            .map(|(lid, c_id, _)| (serde_json::from_str(lid).unwrap(), *c_id))
530            .collect();
531
532        let quiescence = CURRENT_SIM_CONNECTIONS.with(|connections| {
533            let connections = connections.borrow();
534            connections.quiescence.clone()
535        });
536
537        let mut launched = LaunchedSim {
538            async_dfirs: async_dfirs
539                .into_iter()
540                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
541                .collect(),
542            possibly_ready_ticks: vec![],
543            not_ready_ticks: tick_dfirs
544                .into_iter()
545                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
546                .collect(),
547            possibly_ready_observation: vec![],
548            not_ready_observation,
549            hooks: hooks
550                .into_iter()
551                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
552                .collect(),
553            inline_hooks: inline_hooks
554                .into_iter()
555                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
556                .collect(),
557            log: if self.log {
558                if let Some(w) = log_override {
559                    LogKind::Custom(w)
560                } else {
561                    LogKind::Stderr
562                }
563            } else {
564                LogKind::Null
565            },
566            quiescence,
567        };
568
569        async move { launched.scheduler().await }
570    }
571}
572
573impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
574    fn clone(&self) -> Self {
575        *self
576    }
577}
578
579impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
580
581impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
582    async fn with_stream<Out>(
583        &self,
584        thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
585    ) -> Out {
586        let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
587            let connections = connections.borrow();
588            let port = connections.external_registered.get(&self.0).unwrap();
589            (
590                connections.output_receivers.get(port).unwrap().clone(),
591                connections.quiescence.clone(),
592            )
593        });
594
595        let mut receiver_stream = receiver.lock().await;
596        let mut notified_fut = pin!(quiescence.notified());
597        let mut quiescence_aware = futures::stream::poll_fn(|cx| {
598            use std::task::Poll;
599            match receiver_stream.poll_next_unpin(cx) {
600                Poll::Ready(Some(bytes)) => {
601                    return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
602                }
603                Poll::Ready(None) => return Poll::Ready(None),
604                Poll::Pending => {}
605            }
606            if quiescence.is_quiescent() {
607                return Poll::Ready(None);
608            }
609            let () = ready!(notified_fut.as_mut().poll(cx));
610            notified_fut.set(quiescence.notified());
611            Poll::Ready(None)
612        });
613        thunk(&mut pin!(&mut quiescence_aware)).await
614    }
615
616    /// Asserts that the stream has ended and no more messages can possibly arrive.
617    pub fn assert_no_more(self) -> impl Future<Output = ()>
618    where
619        T: Debug,
620    {
621        FutureTrackingCaller {
622            future: async move {
623                self.with_stream(async |stream| {
624                    if let Some(next) = stream.next().await {
625                        return Err(format!(
626                            "Stream yielded unexpected message: {:?}, expected termination",
627                            next
628                        ));
629                    }
630                    Ok(())
631                })
632                .await
633            },
634        }
635    }
636}
637
638impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
639    /// Receives the next message from the external bincode stream. This will wait until a message
640    /// is available, or return `None` if no more messages can possibly arrive.
641    pub async fn next(&self) -> Option<T> {
642        self.with_stream(async |stream| stream.next().await).await
643    }
644
645    /// Collects all remaining messages from the external bincode stream into a collection. This
646    /// will wait until no more messages can possibly arrive.
647    pub async fn collect<C: Default + Extend<T>>(self) -> C {
648        self.with_stream(async |stream| stream.collect().await)
649            .await
650    }
651
652    /// Asserts that the stream yields exactly the expected sequence of messages, in order.
653    /// This does not check that the stream ends, use [`Self::assert_yields_only`] for that.
654    pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
655        &self,
656        expected: I,
657    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
658    where
659        T: Debug + PartialEq<T2>,
660    {
661        FutureTrackingCaller {
662            future: async {
663                let mut expected: VecDeque<T2> = expected.into_iter().collect();
664
665                while !expected.is_empty() {
666                    if let Some(next) = self.next().await {
667                        let next_expected = expected.pop_front().unwrap();
668                        if next != next_expected {
669                            return Err(format!(
670                                "Stream yielded unexpected message: {:?}, expected: {:?}",
671                                next, next_expected
672                            ));
673                        }
674                    } else {
675                        return Err(format!(
676                            "Stream ended early, still expected: {:?}",
677                            expected
678                        ));
679                    }
680                }
681
682                Ok(())
683            },
684        }
685    }
686
687    /// Asserts that the stream yields only the expected sequence of messages, in order,
688    /// and then ends.
689    pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
690        &self,
691        expected: I,
692    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
693    where
694        T: Debug + PartialEq<T2>,
695    {
696        ChainedFuture {
697            first: self.assert_yields(expected),
698            second: self.assert_no_more(),
699            first_done: false,
700        }
701    }
702}
703
704pin_project_lite::pin_project! {
705    // A future that tracks the location of the `.await` call for better panic messages.
706    //
707    // `#[track_caller]` is important for us to create assertion methods because it makes
708    // the panic backtrace show up at that method (instead of inside the call tree within
709    // that method). This is e.g. what `Option::unwrap` uses. Unfortunately, `#[track_caller]`
710    // does not work correctly for async methods (or `dyn Future` either), so we have to
711    // create these concrete future types that (1) have `#[track_caller]` on their `poll()`
712    // method and (2) have the `panic!` triggered in their `poll()` method (or in a directly
713    // nested concrete future).
714    struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
715        #[pin]
716        future: F,
717    }
718}
719
720impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
721    type Output = ();
722
723    #[track_caller]
724    fn poll(
725        mut self: Pin<&mut Self>,
726        cx: &mut std::task::Context<'_>,
727    ) -> std::task::Poll<Self::Output> {
728        match ready!(self.as_mut().project().future.poll(cx)) {
729            Ok(()) => std::task::Poll::Ready(()),
730            Err(e) => panic!("{}", e),
731        }
732    }
733}
734
735pin_project_lite::pin_project! {
736    // A future that first awaits the first future, then the second, propagating caller info.
737    //
738    // See [`FutureTrackingCaller`] for context.
739    struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
740        #[pin]
741        first: F1,
742        #[pin]
743        second: F2,
744        first_done: bool,
745    }
746}
747
748impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
749    type Output = ();
750
751    #[track_caller]
752    fn poll(
753        mut self: Pin<&mut Self>,
754        cx: &mut std::task::Context<'_>,
755    ) -> std::task::Poll<Self::Output> {
756        if !self.first_done {
757            ready!(self.as_mut().project().first.poll(cx));
758            *self.as_mut().project().first_done = true;
759        }
760
761        self.as_mut().project().second.poll(cx)
762    }
763}
764
765impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
766    /// Collects all remaining messages from the external bincode stream into a collection,
767    /// sorting them. This will wait until no more messages can possibly arrive.
768    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
769    where
770        T: Ord,
771    {
772        self.with_stream(async |stream| {
773            let mut collected: C = stream.collect().await;
774            collected.as_mut().sort();
775            collected
776        })
777        .await
778    }
779
780    /// Asserts that the stream yields exactly the expected sequence of messages, in some order.
781    /// This does not check that the stream ends, use [`Self::assert_yields_only_unordered`] for that.
782    pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
783        &self,
784        expected: I,
785    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
786    where
787        T: Debug + PartialEq<T2>,
788    {
789        FutureTrackingCaller {
790            future: async {
791                self.with_stream(async |stream| {
792                    let mut expected: Vec<T2> = expected.into_iter().collect();
793
794                    while !expected.is_empty() {
795                        if let Some(next) = stream.next().await {
796                            let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
797                            if let Some((i, _)) = idx {
798                                expected.swap_remove(i);
799                            } else {
800                                return Err(format!(
801                                    "Stream yielded unexpected message: {:?}",
802                                    next
803                                ));
804                            }
805                        } else {
806                            return Err(format!(
807                                "Stream ended early, still expected: {:?}",
808                                expected
809                            ));
810                        }
811                    }
812
813                    Ok(())
814                })
815                .await
816            },
817        }
818    }
819
820    /// Asserts that the stream yields only the expected sequence of messages, in some order,
821    /// and then ends.
822    pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
823        &self,
824        expected: I,
825    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
826    where
827        T: Debug + PartialEq<T2>,
828    {
829        ChainedFuture {
830            first: self.assert_yields_unordered(expected),
831            second: self.assert_no_more(),
832            first_done: false,
833        }
834    }
835}
836
837impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
838    fn with_sink<Out>(
839        &self,
840        thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
841    ) -> Out {
842        let (sender, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
843            let connections = connections.borrow();
844            (
845                connections
846                    .input_senders
847                    .get(connections.external_registered.get(&self.0).unwrap())
848                    .unwrap()
849                    .clone(),
850                connections.quiescence.clone(),
851            )
852        });
853
854        thunk(&move |t| {
855            let res = sender.send(bincode::serialize(&t).unwrap().into());
856            quiescence.resume();
857            res
858        })
859    }
860}
861
862impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
863    /// Sends several messages to the external bincode sink. The messages will be asynchronously
864    /// processed as part of the simulation, in non-deterministic order.
865    pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
866        self.with_sink(|send| {
867            for t in iter {
868                send(t).unwrap();
869            }
870        })
871    }
872}
873
874impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
875    /// Sends a message to the external bincode sink. The message will be asynchronously processed
876    /// as part of the simulation.
877    pub fn send(&self, t: T) {
878        self.with_sink(|send| send(t)).unwrap();
879    }
880
881    /// Sends several messages to the external bincode sink. The messages will be asynchronously
882    /// processed as part of the simulation.
883    pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
884        self.with_sink(|send| {
885            for t in iter {
886                send(t).unwrap();
887            }
888        })
889    }
890}
891
892impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone
893    for SimClusterReceiver<T, O, R>
894{
895    fn clone(&self) -> Self {
896        *self
897    }
898}
899
900impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy
901    for SimClusterReceiver<T, O, R>
902{
903}
904
905impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterReceiver<T, O, R> {
906    async fn with_member_stream<Out>(
907        &self,
908        member_id: u32,
909        thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
910    ) -> Out {
911        let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
912            let connections = connections.borrow();
913            let port = connections.external_registered.get(&self.0).unwrap();
914            let receivers = connections.cluster_output_receivers.get(port).unwrap();
915            (
916                receivers[member_id as usize].clone(),
917                connections.quiescence.clone(),
918            )
919        });
920
921        let mut lock = receiver.lock().await;
922        let mut notified_fut = pin!(quiescence.notified());
923        let mut quiescence_aware = futures::stream::poll_fn(|cx| {
924            use std::task::Poll;
925            match lock.poll_next_unpin(cx) {
926                Poll::Ready(Some(bytes)) => {
927                    return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
928                }
929                Poll::Ready(None) => return Poll::Ready(None),
930                Poll::Pending => {}
931            }
932            if quiescence.is_quiescent() {
933                return Poll::Ready(None);
934            }
935            let () = ready!(notified_fut.as_mut().poll(cx));
936            notified_fut.set(quiescence.notified());
937            Poll::Ready(None)
938        });
939        thunk(&mut pin!(&mut quiescence_aware)).await
940    }
941}
942
943impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, TotalOrder, ExactlyOnce> {
944    /// Receives the next value from a specific cluster member.
945    pub async fn next(&self, member_id: u32) -> Option<T> {
946        self.with_member_stream(member_id, async |stream| stream.next().await)
947            .await
948    }
949
950    /// Collects all remaining values from a specific cluster member into a collection.
951    pub async fn collect<C: Default + Extend<T>>(self, member_id: u32) -> C {
952        self.with_member_stream(member_id, async |stream| stream.collect().await)
953            .await
954    }
955}
956
957impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, NoOrder, ExactlyOnce> {
958    /// Collects all remaining values from a specific cluster member, sorted.
959    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self, member_id: u32) -> C
960    where
961        T: Ord,
962    {
963        self.with_member_stream(member_id, async |stream| {
964            let mut collected: C = stream.collect().await;
965            collected.as_mut().sort();
966            collected
967        })
968        .await
969    }
970}
971
972impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterSender<T, O, R> {
973    fn with_sink<Out>(
974        &self,
975        thunk: impl FnOnce(
976            &dyn Fn(u32, T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>,
977        ) -> Out,
978    ) -> Out {
979        let (senders, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
980            let connections = connections.borrow();
981            (
982                connections
983                    .cluster_input_senders
984                    .get(connections.external_registered.get(&self.0).unwrap())
985                    .unwrap()
986                    .clone(),
987                connections.quiescence.clone(),
988            )
989        });
990
991        thunk(&move |member_id: u32, t: T| {
992            let payload = bincode::serialize(&t).unwrap();
993            let res = senders[member_id as usize].send(Bytes::from(payload));
994            quiescence.resume();
995            res
996        })
997    }
998}
999
1000impl<T: Serialize + DeserializeOwned> SimClusterSender<T, TotalOrder, ExactlyOnce> {
1001    /// Sends a value to a specific cluster member.
1002    pub fn send(&self, member_id: u32, t: T) {
1003        self.with_sink(|send| send(member_id, t)).unwrap();
1004    }
1005
1006    /// Sends multiple values to specific cluster members.
1007    pub fn send_many<I: IntoIterator<Item = (u32, T)>>(&self, iter: I) {
1008        self.with_sink(|send| {
1009            for (member_id, t) in iter {
1010                send(member_id, t).unwrap();
1011            }
1012        })
1013    }
1014}
1015
1016enum LogKind<W: std::io::Write> {
1017    Null,
1018    Stderr,
1019    Custom(W),
1020}
1021
1022// via https://www.reddit.com/r/rust/comments/t69sld/is_there_a_way_to_allow_either_stdfmtwrite_or/
1023impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
1024    fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
1025        match self {
1026            LogKind::Null => Ok(()),
1027            LogKind::Stderr => {
1028                eprint!("{}", s);
1029                Ok(())
1030            }
1031            LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
1032        }
1033    }
1034}
1035
1036/// A running simulation, which manages the async DFIRs, tick DFIRs, and hook-based
1037/// scheduling decisions for non-deterministic operators like `batch` and `assume_ordering`.
1038///
1039/// The scheduler loops between three kinds of work:
1040/// - **Async DFIRs**: long-running top-level dataflows (one per process/cluster member) that
1041///   produce data consumed by ticks and observations.
1042/// - **Ticks**: tick-scoped DFIRs that execute a single tick. Before running, their associated
1043///   hooks (e.g. from `batch`) are resolved to decide what data to release into the tick.
1044/// - **Observations**: top-level locations that have hooks (e.g. from `assume_ordering` on a
1045///   non-tick stream) needing decisions, but no tick DFIR to execute. The scheduler just
1046///   resolves their hooks.
1047struct LaunchedSim<W: std::io::Write> {
1048    /// Top-level async DFIRs, one per process/cluster member. These run continuously and
1049    /// produce data that feeds into ticks and observations.
1050    async_dfirs: Vec<(LocationId, Option<u32>, DfirErased)>,
1051    /// Tick DFIRs whose parent async DFIR has made progress, so they may be ready to run.
1052    /// The scheduler further filters these by checking whether their hooks have pending decisions.
1053    possibly_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1054    /// Tick DFIRs whose parent async DFIR has not yet made progress since they were last checked.
1055    not_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1056    /// Top-level locations whose async DFIR has made progress and whose hooks (from top-level
1057    /// `assume_ordering`) may have ordering decisions to resolve. Unlike ticks, these have no
1058    /// DFIR to execute — only hook resolution.
1059    possibly_ready_observation: Vec<(LocationId, Option<u32>)>,
1060    /// Top-level locations whose async DFIR has not yet made progress since they were last checked.
1061    not_ready_observation: Vec<(LocationId, Option<u32>)>,
1062    /// Hooks keyed by (location, cluster_member_id). These are resolved *before* a tick runs
1063    /// (for `batch` hooks) or standalone (for top-level `assume_ordering` hooks via observations).
1064    hooks: Hooks<LocationId>,
1065    /// Inline hooks keyed by (tick location, cluster_member_id). These are resolved *during*
1066    /// tick execution via a `tokio::select!` loop, for operators like `assume_ordering` inside
1067    /// a tick that block on ordering decisions while the tick DFIR is running.
1068    inline_hooks: InlineHooks<LocationId>,
1069    log: LogKind<W>,
1070    /// Represents quiescence state of the simulation.
1071    quiescence: Rc<QuiescenceState>,
1072}
1073
1074impl<W: std::io::Write> LaunchedSim<W> {
1075    async fn scheduler(&mut self) {
1076        loop {
1077            tokio::task::yield_now().await;
1078            let mut any_made_progress = false;
1079            for (loc, c_id, dfir) in &mut self.async_dfirs {
1080                if dfir.run_tick().await {
1081                    any_made_progress = true;
1082                    let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
1083                        .not_ready_ticks
1084                        .drain(..)
1085                        .partition(|(tick_loc, tick_c_id, _)| {
1086                            let LocationId::Tick(_, outer) = tick_loc else {
1087                                unreachable!()
1088                            };
1089                            outer.as_ref() == loc && tick_c_id == c_id
1090                        });
1091
1092                    self.possibly_ready_ticks.extend(now_ready);
1093                    self.not_ready_ticks.extend(still_not_ready);
1094
1095                    let (now_ready_obs, still_not_ready_obs): (Vec<_>, Vec<_>) = self
1096                        .not_ready_observation
1097                        .drain(..)
1098                        .partition(|(obs_loc, obs_c_id)| obs_loc == loc && obs_c_id == c_id);
1099
1100                    self.possibly_ready_observation.extend(now_ready_obs);
1101                    self.not_ready_observation.extend(still_not_ready_obs);
1102                }
1103            }
1104
1105            if any_made_progress {
1106                continue;
1107            } else {
1108                use bolero::generator::*;
1109
1110                let (ready_tick, mut not_ready_tick): (Vec<_>, Vec<_>) = self
1111                    .possibly_ready_ticks
1112                    .drain(..)
1113                    .partition(|(name, cid, _)| {
1114                        let hooks = self.hooks.get(&(name.clone(), *cid)).unwrap();
1115                        // All hooks must be ready (have received input or have a last value)
1116                        hooks.iter().all(|hook| hook.is_ready())
1117                            // And at least one hook must be able to make progress
1118                            && hooks.iter().any(|hook| {
1119                                hook.current_decision().unwrap_or(false)
1120                                    || hook.can_make_nontrivial_decision()
1121                            })
1122                    });
1123
1124                self.possibly_ready_ticks = ready_tick;
1125                self.not_ready_ticks.append(&mut not_ready_tick);
1126
1127                let (ready_obs, mut not_ready_obs): (Vec<_>, Vec<_>) = self
1128                    .possibly_ready_observation
1129                    .drain(..)
1130                    .partition(|(name, cid)| {
1131                        self.hooks
1132                            .get(&(name.clone(), *cid))
1133                            .into_iter()
1134                            .flatten()
1135                            .any(|hook| {
1136                                hook.current_decision().unwrap_or(false)
1137                                    || hook.can_make_nontrivial_decision()
1138                            })
1139                    });
1140
1141                self.possibly_ready_observation = ready_obs;
1142                self.not_ready_observation.append(&mut not_ready_obs);
1143
1144                if self.possibly_ready_ticks.is_empty()
1145                    && self.possibly_ready_observation.is_empty()
1146                {
1147                    // If any tick is blocked because a hook is not ready, that's a
1148                    // simulator bug — it means a singleton never received a value.
1149                    for (name, cid, _) in &self.not_ready_ticks {
1150                        let hooks = self.hooks.get(&(name.clone(), *cid)).unwrap();
1151                        abort_assert!(
1152                            hooks.iter().all(|hook| hook.is_ready()),
1153                            "tick has a hook that never became ready"
1154                        );
1155                    }
1156
1157                    // Signal quiescence and wait for new input.
1158                    self.quiescence.wait_for_resume().await;
1159                } else {
1160                    let next_tick_or_obs = (0..(self.possibly_ready_ticks.len()
1161                        + self.possibly_ready_observation.len()))
1162                        .any();
1163
1164                    if next_tick_or_obs < self.possibly_ready_ticks.len() {
1165                        let next_tick = next_tick_or_obs;
1166                        let mut removed = self.possibly_ready_ticks.remove(next_tick);
1167
1168                        match &mut self.log {
1169                            LogKind::Null => {}
1170                            LogKind::Stderr => {
1171                                if let Some(cid) = &removed.1 {
1172                                    eprintln!(
1173                                        "\n{}",
1174                                        format!("Running Tick (Cluster Member {})", cid)
1175                                            .color(colored::Color::Magenta)
1176                                            .bold()
1177                                    )
1178                                } else {
1179                                    eprintln!(
1180                                        "\n{}",
1181                                        "Running Tick".color(colored::Color::Magenta).bold()
1182                                    )
1183                                }
1184                            }
1185                            LogKind::Custom(writer) => {
1186                                writeln!(
1187                                    writer,
1188                                    "\n{}",
1189                                    "Running Tick".color(colored::Color::Magenta).bold()
1190                                )
1191                                .unwrap();
1192                            }
1193                        }
1194
1195                        let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
1196                            write.write_str(&"*".color(colored::Color::Magenta).bold())?;
1197                            write.write_str(" ")
1198                        };
1199
1200                        let mut tick_decision_writer = indenter::indented(&mut self.log)
1201                            .with_format(indenter::Format::Custom {
1202                                inserter: &mut asterisk_indenter,
1203                            });
1204
1205                        let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
1206                        run_hooks(&mut tick_decision_writer, hooks);
1207
1208                        let run_tick_future = removed.2.run_tick();
1209                        if let Some(inline_hooks) =
1210                            self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
1211                        {
1212                            let mut run_tick_future_pinned = pin!(run_tick_future);
1213
1214                            loop {
1215                                tokio::select! {
1216                                    biased;
1217                                    r = &mut run_tick_future_pinned => {
1218                                        abort_assert!(r, "tick DFIR run_tick() returned false");
1219                                        break;
1220                                    }
1221                                    _ = async {} => {
1222                                        bolero_generator::any::scope::borrow_with(|driver| {
1223                                            for hook in inline_hooks.iter_mut() {
1224                                                if hook.pending_decision() {
1225                                                    if !hook.has_decision() {
1226                                                        hook.autonomous_decision(driver);
1227                                                    }
1228
1229                                                    hook.release_decision(&mut tick_decision_writer);
1230                                                }
1231                                            }
1232                                        });
1233                                    }
1234                                }
1235                            }
1236                        } else {
1237                            abort_assert!(
1238                                run_tick_future.await,
1239                                "tick DFIR run_tick() returned false"
1240                            );
1241                        }
1242
1243                        self.possibly_ready_ticks.push(removed);
1244                    } else {
1245                        let next_obs = next_tick_or_obs - self.possibly_ready_ticks.len();
1246                        let mut default_hooks = vec![];
1247                        let hooks = self
1248                            .hooks
1249                            .get_mut(&self.possibly_ready_observation[next_obs])
1250                            .unwrap_or(&mut default_hooks);
1251
1252                        run_hooks(&mut self.log, hooks);
1253                    }
1254                }
1255            }
1256        }
1257    }
1258}
1259
1260fn run_hooks(tick_decision_writer: &mut impl std::fmt::Write, hooks: &mut Vec<Box<dyn SimHook>>) {
1261    let mut remaining_decision_count = hooks.len();
1262    let mut made_nontrivial_decision = false;
1263
1264    bolero::generator::bolero_generator::any::scope::borrow_with(|driver| {
1265        // first, scan manual decisions
1266        hooks.iter_mut().for_each(|hook| {
1267            if let Some(is_nontrivial) = hook.current_decision() {
1268                made_nontrivial_decision |= is_nontrivial;
1269                remaining_decision_count -= 1;
1270            } else if !hook.can_make_nontrivial_decision() {
1271                // if no nontrivial decision is possible, make a trivial one
1272                // (we need to do this in the first pass to force nontrivial decisions
1273                // on the remaining hooks)
1274                hook.autonomous_decision(driver, false);
1275                remaining_decision_count -= 1;
1276            }
1277        });
1278
1279        hooks.iter_mut().for_each(|hook| {
1280            if hook.current_decision().is_none() {
1281                made_nontrivial_decision |= hook.autonomous_decision(
1282                    driver,
1283                    !made_nontrivial_decision && remaining_decision_count == 1,
1284                );
1285                remaining_decision_count -= 1;
1286            }
1287
1288            hook.release_decision(tick_decision_writer);
1289        });
1290    });
1291}