Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4use std::fmt::{Debug, Display};
5use std::hash::{Hash, Hasher};
6use std::ops::Deref;
7use std::rc::Rc;
8
9#[cfg(feature = "build")]
10use dfir_lang::graph::FlatGraphBuilder;
11#[cfg(feature = "build")]
12use proc_macro2::Span;
13use proc_macro2::TokenStream;
14use quote::ToTokens;
15#[cfg(feature = "build")]
16use quote::quote;
17#[cfg(feature = "build")]
18use slotmap::{SecondaryMap, SparseSecondaryMap};
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24use crate::compile::builder::{CycleId, ExternalPortId};
25#[cfg(feature = "build")]
26use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
27use crate::location::dynamic::LocationId;
28use crate::location::{LocationKey, NetworkHint};
29
30pub mod backtrace;
31use backtrace::Backtrace;
32
33/// Wrapper that displays only the tokens of a parsed expr.
34///
35/// Boxes `syn::Type` which is ~240 bytes.
36#[derive(Clone, Hash)]
37pub struct DebugExpr(pub Box<syn::Expr>);
38
39impl From<syn::Expr> for DebugExpr {
40    fn from(expr: syn::Expr) -> Self {
41        Self(Box::new(expr))
42    }
43}
44
45impl Deref for DebugExpr {
46    type Target = syn::Expr;
47
48    fn deref(&self) -> &Self::Target {
49        &self.0
50    }
51}
52
53impl ToTokens for DebugExpr {
54    fn to_tokens(&self, tokens: &mut TokenStream) {
55        self.0.to_tokens(tokens);
56    }
57}
58
59impl Debug for DebugExpr {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        write!(f, "{}", self.0.to_token_stream())
62    }
63}
64
65impl Display for DebugExpr {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        let original = self.0.as_ref().clone();
68        let simplified = simplify_q_macro(original);
69
70        // For now, just use quote formatting without trying to parse as a statement
71        // This avoids the syn::parse_quote! issues entirely
72        write!(f, "q!({})", quote::quote!(#simplified))
73    }
74}
75
76/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
77fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
78    // Try to parse the token string as a syn::Expr
79    // Use a visitor to simplify q! macro expansions
80    let mut simplifier = QMacroSimplifier::new();
81    simplifier.visit_expr_mut(&mut expr);
82
83    // If we found and simplified a q! macro, return the simplified version
84    if let Some(simplified) = simplifier.simplified_result {
85        simplified
86    } else {
87        expr
88    }
89}
90
91/// AST visitor that simplifies q! macro expansions
92#[derive(Default)]
93pub struct QMacroSimplifier {
94    pub simplified_result: Option<syn::Expr>,
95}
96
97impl QMacroSimplifier {
98    pub fn new() -> Self {
99        Self::default()
100    }
101}
102
103impl VisitMut for QMacroSimplifier {
104    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
105        // Check if we already found a result to avoid further processing
106        if self.simplified_result.is_some() {
107            return;
108        }
109
110        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
111            // Look for calls to stageleft::runtime_support::fn*
112            && self.is_stageleft_runtime_support_call(&path_expr.path)
113            // Try to extract the closure from the arguments
114            && let Some(closure) = self.extract_closure_from_args(&call.args)
115        {
116            self.simplified_result = Some(closure);
117            return;
118        }
119
120        // Continue visiting child expressions using the default implementation
121        // Use the default visitor to avoid infinite recursion
122        syn::visit_mut::visit_expr_mut(self, expr);
123    }
124}
125
126impl QMacroSimplifier {
127    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
128        // Check if this is a call to stageleft::runtime_support::fn*
129        if let Some(last_segment) = path.segments.last() {
130            let fn_name = last_segment.ident.to_string();
131            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
132            fn_name.contains("_type_hint")
133                && path.segments.len() > 2
134                && path.segments[0].ident == "stageleft"
135                && path.segments[1].ident == "runtime_support"
136        } else {
137            false
138        }
139    }
140
141    fn extract_closure_from_args(
142        &self,
143        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
144    ) -> Option<syn::Expr> {
145        // Look through the arguments for a closure expression
146        for arg in args {
147            if let syn::Expr::Closure(_) = arg {
148                return Some(arg.clone());
149            }
150            // Also check for closures nested in other expressions (like blocks)
151            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
152                return Some(closure_expr);
153            }
154        }
155        None
156    }
157
158    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
159        let mut visitor = ClosureFinder {
160            found_closure: None,
161            prefer_inner_blocks: true,
162        };
163        visitor.visit_expr(expr);
164        visitor.found_closure
165    }
166}
167
168/// Visitor that finds closures in expressions with special block handling
169struct ClosureFinder {
170    found_closure: Option<syn::Expr>,
171    prefer_inner_blocks: bool,
172}
173
174impl<'ast> Visit<'ast> for ClosureFinder {
175    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
176        // If we already found a closure, don't continue searching
177        if self.found_closure.is_some() {
178            return;
179        }
180
181        match expr {
182            syn::Expr::Closure(_) => {
183                self.found_closure = Some(expr.clone());
184            }
185            syn::Expr::Block(block) if self.prefer_inner_blocks => {
186                // Special handling for blocks - look for inner blocks that contain closures
187                for stmt in &block.block.stmts {
188                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
189                        && let syn::Expr::Block(_) = stmt_expr
190                    {
191                        // Check if this nested block contains a closure
192                        let mut inner_visitor = ClosureFinder {
193                            found_closure: None,
194                            prefer_inner_blocks: false, // Avoid infinite recursion
195                        };
196                        inner_visitor.visit_expr(stmt_expr);
197                        if inner_visitor.found_closure.is_some() {
198                            // Found a closure in an inner block, return that block
199                            self.found_closure = Some(stmt_expr.clone());
200                            return;
201                        }
202                    }
203                }
204
205                // If no inner block with closure found, continue with normal visitation
206                visit::visit_expr(self, expr);
207
208                // If we found a closure, just return the closure itself, not the whole block
209                // unless we're in the special case where we want the containing block
210                if self.found_closure.is_some() {
211                    // The closure was found during visitation, no need to wrap in block
212                }
213            }
214            _ => {
215                // Use default visitor behavior for all other expressions
216                visit::visit_expr(self, expr);
217            }
218        }
219    }
220}
221
222/// Debug displays the type's tokens.
223///
224/// Boxes `syn::Type` which is ~320 bytes.
225#[derive(Clone, PartialEq, Eq, Hash)]
226pub struct DebugType(pub Box<syn::Type>);
227
228impl From<syn::Type> for DebugType {
229    fn from(t: syn::Type) -> Self {
230        Self(Box::new(t))
231    }
232}
233
234impl Deref for DebugType {
235    type Target = syn::Type;
236
237    fn deref(&self) -> &Self::Target {
238        &self.0
239    }
240}
241
242impl ToTokens for DebugType {
243    fn to_tokens(&self, tokens: &mut TokenStream) {
244        self.0.to_tokens(tokens);
245    }
246}
247
248impl Debug for DebugType {
249    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250        write!(f, "{}", self.0.to_token_stream())
251    }
252}
253
254pub enum DebugInstantiate {
255    Building,
256    Finalized(Box<DebugInstantiateFinalized>),
257}
258
259#[cfg_attr(
260    not(feature = "build"),
261    expect(
262        dead_code,
263        reason = "sink, source unused without `feature = \"build\"`."
264    )
265)]
266pub struct DebugInstantiateFinalized {
267    sink: syn::Expr,
268    source: syn::Expr,
269    connect_fn: Option<Box<dyn FnOnce()>>,
270}
271
272impl From<DebugInstantiateFinalized> for DebugInstantiate {
273    fn from(f: DebugInstantiateFinalized) -> Self {
274        Self::Finalized(Box::new(f))
275    }
276}
277
278impl Debug for DebugInstantiate {
279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280        write!(f, "<network instantiate>")
281    }
282}
283
284impl Hash for DebugInstantiate {
285    fn hash<H: Hasher>(&self, _state: &mut H) {
286        // Do nothing
287    }
288}
289
290impl Clone for DebugInstantiate {
291    fn clone(&self) -> Self {
292        match self {
293            DebugInstantiate::Building => DebugInstantiate::Building,
294            DebugInstantiate::Finalized(_) => {
295                panic!("DebugInstantiate::Finalized should not be cloned")
296            }
297        }
298    }
299}
300
301/// A source in a Hydro graph, where data enters the graph.
302#[derive(Debug, Hash, Clone)]
303pub enum HydroSource {
304    Stream(DebugExpr),
305    ExternalNetwork(),
306    Iter(DebugExpr),
307    Spin(),
308    ClusterMembers(LocationId),
309    Embedded(syn::Ident),
310}
311
312#[cfg(feature = "build")]
313/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
314/// and simulations.
315///
316/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
317/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
318pub trait DfirBuilder {
319    /// Whether the representation of singletons should include intermediate states.
320    fn singleton_intermediates(&self) -> bool;
321
322    /// Gets the DFIR builder for the given location, creating it if necessary.
323    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
324
325    fn batch(
326        &mut self,
327        in_ident: syn::Ident,
328        in_location: &LocationId,
329        in_kind: &CollectionKind,
330        out_ident: &syn::Ident,
331        out_location: &LocationId,
332        op_meta: &HydroIrOpMetadata,
333    );
334    fn yield_from_tick(
335        &mut self,
336        in_ident: syn::Ident,
337        in_location: &LocationId,
338        in_kind: &CollectionKind,
339        out_ident: &syn::Ident,
340        out_location: &LocationId,
341    );
342
343    fn begin_atomic(
344        &mut self,
345        in_ident: syn::Ident,
346        in_location: &LocationId,
347        in_kind: &CollectionKind,
348        out_ident: &syn::Ident,
349        out_location: &LocationId,
350        op_meta: &HydroIrOpMetadata,
351    );
352    fn end_atomic(
353        &mut self,
354        in_ident: syn::Ident,
355        in_location: &LocationId,
356        in_kind: &CollectionKind,
357        out_ident: &syn::Ident,
358    );
359
360    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
361    fn observe_nondet(
362        &mut self,
363        trusted: bool,
364        location: &LocationId,
365        in_ident: syn::Ident,
366        in_kind: &CollectionKind,
367        out_ident: &syn::Ident,
368        out_kind: &CollectionKind,
369        op_meta: &HydroIrOpMetadata,
370    );
371
372    #[expect(clippy::too_many_arguments, reason = "TODO")]
373    fn create_network(
374        &mut self,
375        from: &LocationId,
376        to: &LocationId,
377        input_ident: syn::Ident,
378        out_ident: &syn::Ident,
379        serialize: Option<&DebugExpr>,
380        sink: syn::Expr,
381        source: syn::Expr,
382        deserialize: Option<&DebugExpr>,
383        tag_id: usize,
384    );
385
386    fn create_external_source(
387        &mut self,
388        on: &LocationId,
389        source_expr: syn::Expr,
390        out_ident: &syn::Ident,
391        deserialize: Option<&DebugExpr>,
392        tag_id: usize,
393    );
394
395    fn create_external_output(
396        &mut self,
397        on: &LocationId,
398        sink_expr: syn::Expr,
399        input_ident: &syn::Ident,
400        serialize: Option<&DebugExpr>,
401        tag_id: usize,
402    );
403}
404
405#[cfg(feature = "build")]
406impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
407    fn singleton_intermediates(&self) -> bool {
408        false
409    }
410
411    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
412        self.entry(location.root().key())
413            .expect("location was removed")
414            .or_default()
415    }
416
417    fn batch(
418        &mut self,
419        in_ident: syn::Ident,
420        in_location: &LocationId,
421        in_kind: &CollectionKind,
422        out_ident: &syn::Ident,
423        _out_location: &LocationId,
424        _op_meta: &HydroIrOpMetadata,
425    ) {
426        let builder = self.get_dfir_mut(in_location.root());
427        if in_kind.is_bounded()
428            && matches!(
429                in_kind,
430                CollectionKind::Singleton { .. }
431                    | CollectionKind::Optional { .. }
432                    | CollectionKind::KeyedSingleton { .. }
433            )
434        {
435            assert!(in_location.is_top_level());
436            builder.add_dfir(
437                parse_quote! {
438                    #out_ident = #in_ident -> persist::<'static>();
439                },
440                None,
441                None,
442            );
443        } else {
444            builder.add_dfir(
445                parse_quote! {
446                    #out_ident = #in_ident;
447                },
448                None,
449                None,
450            );
451        }
452    }
453
454    fn yield_from_tick(
455        &mut self,
456        in_ident: syn::Ident,
457        in_location: &LocationId,
458        _in_kind: &CollectionKind,
459        out_ident: &syn::Ident,
460        _out_location: &LocationId,
461    ) {
462        let builder = self.get_dfir_mut(in_location.root());
463        builder.add_dfir(
464            parse_quote! {
465                #out_ident = #in_ident;
466            },
467            None,
468            None,
469        );
470    }
471
472    fn begin_atomic(
473        &mut self,
474        in_ident: syn::Ident,
475        in_location: &LocationId,
476        _in_kind: &CollectionKind,
477        out_ident: &syn::Ident,
478        _out_location: &LocationId,
479        _op_meta: &HydroIrOpMetadata,
480    ) {
481        let builder = self.get_dfir_mut(in_location.root());
482        builder.add_dfir(
483            parse_quote! {
484                #out_ident = #in_ident;
485            },
486            None,
487            None,
488        );
489    }
490
491    fn end_atomic(
492        &mut self,
493        in_ident: syn::Ident,
494        in_location: &LocationId,
495        _in_kind: &CollectionKind,
496        out_ident: &syn::Ident,
497    ) {
498        let builder = self.get_dfir_mut(in_location.root());
499        builder.add_dfir(
500            parse_quote! {
501                #out_ident = #in_ident;
502            },
503            None,
504            None,
505        );
506    }
507
508    fn observe_nondet(
509        &mut self,
510        _trusted: bool,
511        location: &LocationId,
512        in_ident: syn::Ident,
513        _in_kind: &CollectionKind,
514        out_ident: &syn::Ident,
515        _out_kind: &CollectionKind,
516        _op_meta: &HydroIrOpMetadata,
517    ) {
518        let builder = self.get_dfir_mut(location);
519        builder.add_dfir(
520            parse_quote! {
521                #out_ident = #in_ident;
522            },
523            None,
524            None,
525        );
526    }
527
528    fn create_network(
529        &mut self,
530        from: &LocationId,
531        to: &LocationId,
532        input_ident: syn::Ident,
533        out_ident: &syn::Ident,
534        serialize: Option<&DebugExpr>,
535        sink: syn::Expr,
536        source: syn::Expr,
537        deserialize: Option<&DebugExpr>,
538        tag_id: usize,
539    ) {
540        let sender_builder = self.get_dfir_mut(from);
541        if let Some(serialize_pipeline) = serialize {
542            sender_builder.add_dfir(
543                parse_quote! {
544                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
545                },
546                None,
547                // operator tag separates send and receive, which otherwise have the same next_stmt_id
548                Some(&format!("send{}", tag_id)),
549            );
550        } else {
551            sender_builder.add_dfir(
552                parse_quote! {
553                    #input_ident -> dest_sink(#sink);
554                },
555                None,
556                Some(&format!("send{}", tag_id)),
557            );
558        }
559
560        let receiver_builder = self.get_dfir_mut(to);
561        if let Some(deserialize_pipeline) = deserialize {
562            receiver_builder.add_dfir(
563                parse_quote! {
564                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
565                },
566                None,
567                Some(&format!("recv{}", tag_id)),
568            );
569        } else {
570            receiver_builder.add_dfir(
571                parse_quote! {
572                    #out_ident = source_stream(#source);
573                },
574                None,
575                Some(&format!("recv{}", tag_id)),
576            );
577        }
578    }
579
580    fn create_external_source(
581        &mut self,
582        on: &LocationId,
583        source_expr: syn::Expr,
584        out_ident: &syn::Ident,
585        deserialize: Option<&DebugExpr>,
586        tag_id: usize,
587    ) {
588        let receiver_builder = self.get_dfir_mut(on);
589        if let Some(deserialize_pipeline) = deserialize {
590            receiver_builder.add_dfir(
591                parse_quote! {
592                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
593                },
594                None,
595                Some(&format!("recv{}", tag_id)),
596            );
597        } else {
598            receiver_builder.add_dfir(
599                parse_quote! {
600                    #out_ident = source_stream(#source_expr);
601                },
602                None,
603                Some(&format!("recv{}", tag_id)),
604            );
605        }
606    }
607
608    fn create_external_output(
609        &mut self,
610        on: &LocationId,
611        sink_expr: syn::Expr,
612        input_ident: &syn::Ident,
613        serialize: Option<&DebugExpr>,
614        tag_id: usize,
615    ) {
616        let sender_builder = self.get_dfir_mut(on);
617        if let Some(serialize_fn) = serialize {
618            sender_builder.add_dfir(
619                parse_quote! {
620                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
621                },
622                None,
623                // operator tag separates send and receive, which otherwise have the same next_stmt_id
624                Some(&format!("send{}", tag_id)),
625            );
626        } else {
627            sender_builder.add_dfir(
628                parse_quote! {
629                    #input_ident -> dest_sink(#sink_expr);
630                },
631                None,
632                Some(&format!("send{}", tag_id)),
633            );
634        }
635    }
636}
637
638#[cfg(feature = "build")]
639pub enum BuildersOrCallback<'a, L, N>
640where
641    L: FnMut(&mut HydroRoot, &mut usize),
642    N: FnMut(&mut HydroNode, &mut usize),
643{
644    Builders(&'a mut dyn DfirBuilder),
645    Callback(L, N),
646}
647
648/// An root in a Hydro graph, which is an pipeline that doesn't emit
649/// any downstream values. Traversals over the dataflow graph and
650/// generating DFIR IR start from roots.
651#[derive(Debug, Hash)]
652pub enum HydroRoot {
653    ForEach {
654        f: DebugExpr,
655        input: Box<HydroNode>,
656        op_metadata: HydroIrOpMetadata,
657    },
658    SendExternal {
659        to_external_key: LocationKey,
660        to_port_id: ExternalPortId,
661        to_many: bool,
662        unpaired: bool,
663        serialize_fn: Option<DebugExpr>,
664        instantiate_fn: DebugInstantiate,
665        input: Box<HydroNode>,
666        op_metadata: HydroIrOpMetadata,
667    },
668    DestSink {
669        sink: DebugExpr,
670        input: Box<HydroNode>,
671        op_metadata: HydroIrOpMetadata,
672    },
673    CycleSink {
674        cycle_id: CycleId,
675        input: Box<HydroNode>,
676        op_metadata: HydroIrOpMetadata,
677    },
678    EmbeddedOutput {
679        ident: syn::Ident,
680        input: Box<HydroNode>,
681        op_metadata: HydroIrOpMetadata,
682    },
683}
684
685impl HydroRoot {
686    #[cfg(feature = "build")]
687    pub fn compile_network<'a, D>(
688        &mut self,
689        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
690        seen_tees: &mut SeenTees,
691        processes: &SparseSecondaryMap<LocationKey, D::Process>,
692        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
693        externals: &SparseSecondaryMap<LocationKey, D::External>,
694        env: &mut D::InstantiateEnv,
695    ) where
696        D: Deploy<'a>,
697    {
698        let refcell_extra_stmts = RefCell::new(extra_stmts);
699        let refcell_env = RefCell::new(env);
700        self.transform_bottom_up(
701            &mut |l| {
702                if let HydroRoot::SendExternal {
703                    input,
704                    to_external_key,
705                    to_port_id,
706                    to_many,
707                    unpaired,
708                    instantiate_fn,
709                    ..
710                } = l
711                {
712                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
713                        DebugInstantiate::Building => {
714                            let to_node = externals
715                                .get(*to_external_key)
716                                .unwrap_or_else(|| {
717                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
718                                })
719                                .clone();
720
721                            match input.metadata().location_id.root() {
722                                &LocationId::Process(process_key) => {
723                                    if *to_many {
724                                        (
725                                            (
726                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
727                                                parse_quote!(DUMMY),
728                                            ),
729                                            Box::new(|| {}) as Box<dyn FnOnce()>,
730                                        )
731                                    } else {
732                                        let from_node = processes
733                                            .get(process_key)
734                                            .unwrap_or_else(|| {
735                                                panic!("A process used in the graph was not instantiated: {}", process_key)
736                                            })
737                                            .clone();
738
739                                        let sink_port = from_node.next_port();
740                                        let source_port = to_node.next_port();
741
742                                        if *unpaired {
743                                            use stageleft::quote_type;
744                                            use tokio_util::codec::LengthDelimitedCodec;
745
746                                            to_node.register(*to_port_id, source_port.clone());
747
748                                            let _ = D::e2o_source(
749                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
750                                                &to_node, &source_port,
751                                                &from_node, &sink_port,
752                                                &quote_type::<LengthDelimitedCodec>(),
753                                                format!("{}_{}", *to_external_key, *to_port_id)
754                                            );
755                                        }
756
757                                        (
758                                            (
759                                                D::o2e_sink(
760                                                    &from_node,
761                                                    &sink_port,
762                                                    &to_node,
763                                                    &source_port,
764                                                    format!("{}_{}", *to_external_key, *to_port_id)
765                                                ),
766                                                parse_quote!(DUMMY),
767                                            ),
768                                            if *unpaired {
769                                                D::e2o_connect(
770                                                    &to_node,
771                                                    &source_port,
772                                                    &from_node,
773                                                    &sink_port,
774                                                    *to_many,
775                                                    NetworkHint::Auto,
776                                                )
777                                            } else {
778                                                Box::new(|| {}) as Box<dyn FnOnce()>
779                                            },
780                                        )
781                                    }
782                                }
783                                LocationId::Cluster(_) => todo!(),
784                                _ => panic!()
785                            }
786                        },
787
788                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
789                    };
790
791                    *instantiate_fn = DebugInstantiateFinalized {
792                        sink: sink_expr,
793                        source: source_expr,
794                        connect_fn: Some(connect_fn),
795                    }
796                    .into();
797                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
798                    let element_type = match &input.metadata().collection_kind {
799                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
800                        _ => panic!("Embedded output must have Stream collection kind"),
801                    };
802                    let location_key = match input.metadata().location_id.root() {
803                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
804                        _ => panic!("Embedded output must be on a process or cluster"),
805                    };
806                    D::register_embedded_output(
807                        &mut refcell_env.borrow_mut(),
808                        location_key,
809                        ident,
810                        &element_type,
811                    );
812                }
813            },
814            &mut |n| {
815                if let HydroNode::Network {
816                    name,
817                    input,
818                    instantiate_fn,
819                    metadata,
820                    ..
821                } = n
822                {
823                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
824                        DebugInstantiate::Building => instantiate_network::<D>(
825                            &mut refcell_env.borrow_mut(),
826                            input.metadata().location_id.root(),
827                            metadata.location_id.root(),
828                            processes,
829                            clusters,
830                            name.as_deref(),
831                        ),
832
833                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
834                    };
835
836                    *instantiate_fn = DebugInstantiateFinalized {
837                        sink: sink_expr,
838                        source: source_expr,
839                        connect_fn: Some(connect_fn),
840                    }
841                    .into();
842                } else if let HydroNode::ExternalInput {
843                    from_external_key,
844                    from_port_id,
845                    from_many,
846                    codec_type,
847                    port_hint,
848                    instantiate_fn,
849                    metadata,
850                    ..
851                } = n
852                {
853                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
854                        DebugInstantiate::Building => {
855                            let from_node = externals
856                                .get(*from_external_key)
857                                .unwrap_or_else(|| {
858                                    panic!(
859                                        "A external used in the graph was not instantiated: {}",
860                                        from_external_key,
861                                    )
862                                })
863                                .clone();
864
865                            match metadata.location_id.root() {
866                                &LocationId::Process(process_key) => {
867                                    let to_node = processes
868                                        .get(process_key)
869                                        .unwrap_or_else(|| {
870                                            panic!("A process used in the graph was not instantiated: {}", process_key)
871                                        })
872                                        .clone();
873
874                                    let sink_port = from_node.next_port();
875                                    let source_port = to_node.next_port();
876
877                                    from_node.register(*from_port_id, sink_port.clone());
878
879                                    (
880                                        (
881                                            parse_quote!(DUMMY),
882                                            if *from_many {
883                                                D::e2o_many_source(
884                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
885                                                    &to_node, &source_port,
886                                                    codec_type.0.as_ref(),
887                                                    format!("{}_{}", *from_external_key, *from_port_id)
888                                                )
889                                            } else {
890                                                D::e2o_source(
891                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
892                                                    &from_node, &sink_port,
893                                                    &to_node, &source_port,
894                                                    codec_type.0.as_ref(),
895                                                    format!("{}_{}", *from_external_key, *from_port_id)
896                                                )
897                                            },
898                                        ),
899                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
900                                    )
901                                }
902                                LocationId::Cluster(_) => todo!(),
903                                _ => panic!()
904                            }
905                        },
906
907                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
908                    };
909
910                    *instantiate_fn = DebugInstantiateFinalized {
911                        sink: sink_expr,
912                        source: source_expr,
913                        connect_fn: Some(connect_fn),
914                    }
915                    .into();
916                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
917                    let element_type = match &metadata.collection_kind {
918                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
919                        _ => panic!("Embedded source must have Stream collection kind"),
920                    };
921                    let location_key = match metadata.location_id.root() {
922                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
923                        _ => panic!("Embedded source must be on a process or cluster"),
924                    };
925                    D::register_embedded_input(
926                        &mut refcell_env.borrow_mut(),
927                        location_key,
928                        ident,
929                        &element_type,
930                    );
931                }
932            },
933            seen_tees,
934            false,
935        );
936    }
937
938    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
939        self.transform_bottom_up(
940            &mut |l| {
941                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
942                    match instantiate_fn {
943                        DebugInstantiate::Building => panic!("network not built"),
944
945                        DebugInstantiate::Finalized(finalized) => {
946                            (finalized.connect_fn.take().unwrap())();
947                        }
948                    }
949                }
950            },
951            &mut |n| {
952                if let HydroNode::Network { instantiate_fn, .. }
953                | HydroNode::ExternalInput { instantiate_fn, .. } = n
954                {
955                    match instantiate_fn {
956                        DebugInstantiate::Building => panic!("network not built"),
957
958                        DebugInstantiate::Finalized(finalized) => {
959                            (finalized.connect_fn.take().unwrap())();
960                        }
961                    }
962                }
963            },
964            seen_tees,
965            false,
966        );
967    }
968
969    pub fn transform_bottom_up(
970        &mut self,
971        transform_root: &mut impl FnMut(&mut HydroRoot),
972        transform_node: &mut impl FnMut(&mut HydroNode),
973        seen_tees: &mut SeenTees,
974        check_well_formed: bool,
975    ) {
976        self.transform_children(
977            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
978            seen_tees,
979        );
980
981        transform_root(self);
982    }
983
984    pub fn transform_children(
985        &mut self,
986        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
987        seen_tees: &mut SeenTees,
988    ) {
989        match self {
990            HydroRoot::ForEach { input, .. }
991            | HydroRoot::SendExternal { input, .. }
992            | HydroRoot::DestSink { input, .. }
993            | HydroRoot::CycleSink { input, .. }
994            | HydroRoot::EmbeddedOutput { input, .. } => {
995                transform(input, seen_tees);
996            }
997        }
998    }
999
1000    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
1001        match self {
1002            HydroRoot::ForEach {
1003                f,
1004                input,
1005                op_metadata,
1006            } => HydroRoot::ForEach {
1007                f: f.clone(),
1008                input: Box::new(input.deep_clone(seen_tees)),
1009                op_metadata: op_metadata.clone(),
1010            },
1011            HydroRoot::SendExternal {
1012                to_external_key,
1013                to_port_id,
1014                to_many,
1015                unpaired,
1016                serialize_fn,
1017                instantiate_fn,
1018                input,
1019                op_metadata,
1020            } => HydroRoot::SendExternal {
1021                to_external_key: *to_external_key,
1022                to_port_id: *to_port_id,
1023                to_many: *to_many,
1024                unpaired: *unpaired,
1025                serialize_fn: serialize_fn.clone(),
1026                instantiate_fn: instantiate_fn.clone(),
1027                input: Box::new(input.deep_clone(seen_tees)),
1028                op_metadata: op_metadata.clone(),
1029            },
1030            HydroRoot::DestSink {
1031                sink,
1032                input,
1033                op_metadata,
1034            } => HydroRoot::DestSink {
1035                sink: sink.clone(),
1036                input: Box::new(input.deep_clone(seen_tees)),
1037                op_metadata: op_metadata.clone(),
1038            },
1039            HydroRoot::CycleSink {
1040                cycle_id,
1041                input,
1042                op_metadata,
1043            } => HydroRoot::CycleSink {
1044                cycle_id: *cycle_id,
1045                input: Box::new(input.deep_clone(seen_tees)),
1046                op_metadata: op_metadata.clone(),
1047            },
1048            HydroRoot::EmbeddedOutput {
1049                ident,
1050                input,
1051                op_metadata,
1052            } => HydroRoot::EmbeddedOutput {
1053                ident: ident.clone(),
1054                input: Box::new(input.deep_clone(seen_tees)),
1055                op_metadata: op_metadata.clone(),
1056            },
1057        }
1058    }
1059
1060    #[cfg(feature = "build")]
1061    pub fn emit<'a, D: Deploy<'a>>(
1062        &mut self,
1063        graph_builders: &mut dyn DfirBuilder,
1064        seen_tees: &mut SeenTees,
1065        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1066        next_stmt_id: &mut usize,
1067    ) {
1068        self.emit_core::<D>(
1069            &mut BuildersOrCallback::<
1070                fn(&mut HydroRoot, &mut usize),
1071                fn(&mut HydroNode, &mut usize),
1072            >::Builders(graph_builders),
1073            seen_tees,
1074            built_tees,
1075            next_stmt_id,
1076        );
1077    }
1078
1079    #[cfg(feature = "build")]
1080    pub fn emit_core<'a, D: Deploy<'a>>(
1081        &mut self,
1082        builders_or_callback: &mut BuildersOrCallback<
1083            impl FnMut(&mut HydroRoot, &mut usize),
1084            impl FnMut(&mut HydroNode, &mut usize),
1085        >,
1086        seen_tees: &mut SeenTees,
1087        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1088        next_stmt_id: &mut usize,
1089    ) {
1090        match self {
1091            HydroRoot::ForEach { f, input, .. } => {
1092                let input_ident =
1093                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1094
1095                match builders_or_callback {
1096                    BuildersOrCallback::Builders(graph_builders) => {
1097                        graph_builders
1098                            .get_dfir_mut(&input.metadata().location_id)
1099                            .add_dfir(
1100                                parse_quote! {
1101                                    #input_ident -> for_each(#f);
1102                                },
1103                                None,
1104                                Some(&next_stmt_id.to_string()),
1105                            );
1106                    }
1107                    BuildersOrCallback::Callback(leaf_callback, _) => {
1108                        leaf_callback(self, next_stmt_id);
1109                    }
1110                }
1111
1112                *next_stmt_id += 1;
1113            }
1114
1115            HydroRoot::SendExternal {
1116                serialize_fn,
1117                instantiate_fn,
1118                input,
1119                ..
1120            } => {
1121                let input_ident =
1122                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1123
1124                match builders_or_callback {
1125                    BuildersOrCallback::Builders(graph_builders) => {
1126                        let (sink_expr, _) = match instantiate_fn {
1127                            DebugInstantiate::Building => (
1128                                syn::parse_quote!(DUMMY_SINK),
1129                                syn::parse_quote!(DUMMY_SOURCE),
1130                            ),
1131
1132                            DebugInstantiate::Finalized(finalized) => {
1133                                (finalized.sink.clone(), finalized.source.clone())
1134                            }
1135                        };
1136
1137                        graph_builders.create_external_output(
1138                            &input.metadata().location_id,
1139                            sink_expr,
1140                            &input_ident,
1141                            serialize_fn.as_ref(),
1142                            *next_stmt_id,
1143                        );
1144                    }
1145                    BuildersOrCallback::Callback(leaf_callback, _) => {
1146                        leaf_callback(self, next_stmt_id);
1147                    }
1148                }
1149
1150                *next_stmt_id += 1;
1151            }
1152
1153            HydroRoot::DestSink { sink, input, .. } => {
1154                let input_ident =
1155                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1156
1157                match builders_or_callback {
1158                    BuildersOrCallback::Builders(graph_builders) => {
1159                        graph_builders
1160                            .get_dfir_mut(&input.metadata().location_id)
1161                            .add_dfir(
1162                                parse_quote! {
1163                                    #input_ident -> dest_sink(#sink);
1164                                },
1165                                None,
1166                                Some(&next_stmt_id.to_string()),
1167                            );
1168                    }
1169                    BuildersOrCallback::Callback(leaf_callback, _) => {
1170                        leaf_callback(self, next_stmt_id);
1171                    }
1172                }
1173
1174                *next_stmt_id += 1;
1175            }
1176
1177            HydroRoot::CycleSink {
1178                cycle_id, input, ..
1179            } => {
1180                let input_ident =
1181                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1182
1183                match builders_or_callback {
1184                    BuildersOrCallback::Builders(graph_builders) => {
1185                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1186                            CollectionKind::KeyedSingleton {
1187                                key_type,
1188                                value_type,
1189                                ..
1190                            }
1191                            | CollectionKind::KeyedStream {
1192                                key_type,
1193                                value_type,
1194                                ..
1195                            } => {
1196                                parse_quote!((#key_type, #value_type))
1197                            }
1198                            CollectionKind::Stream { element_type, .. }
1199                            | CollectionKind::Singleton { element_type, .. }
1200                            | CollectionKind::Optional { element_type, .. } => {
1201                                parse_quote!(#element_type)
1202                            }
1203                        };
1204
1205                        let cycle_id_ident = cycle_id.as_ident();
1206                        graph_builders
1207                            .get_dfir_mut(&input.metadata().location_id)
1208                            .add_dfir(
1209                                parse_quote! {
1210                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1211                                },
1212                                None,
1213                                None,
1214                            );
1215                    }
1216                    // No ID, no callback
1217                    BuildersOrCallback::Callback(_, _) => {}
1218                }
1219            }
1220
1221            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1222                let input_ident =
1223                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1224
1225                match builders_or_callback {
1226                    BuildersOrCallback::Builders(graph_builders) => {
1227                        graph_builders
1228                            .get_dfir_mut(&input.metadata().location_id)
1229                            .add_dfir(
1230                                parse_quote! {
1231                                    #input_ident -> for_each(&mut #ident);
1232                                },
1233                                None,
1234                                Some(&next_stmt_id.to_string()),
1235                            );
1236                    }
1237                    BuildersOrCallback::Callback(leaf_callback, _) => {
1238                        leaf_callback(self, next_stmt_id);
1239                    }
1240                }
1241
1242                *next_stmt_id += 1;
1243            }
1244        }
1245    }
1246
1247    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1248        match self {
1249            HydroRoot::ForEach { op_metadata, .. }
1250            | HydroRoot::SendExternal { op_metadata, .. }
1251            | HydroRoot::DestSink { op_metadata, .. }
1252            | HydroRoot::CycleSink { op_metadata, .. }
1253            | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1254        }
1255    }
1256
1257    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1258        match self {
1259            HydroRoot::ForEach { op_metadata, .. }
1260            | HydroRoot::SendExternal { op_metadata, .. }
1261            | HydroRoot::DestSink { op_metadata, .. }
1262            | HydroRoot::CycleSink { op_metadata, .. }
1263            | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1264        }
1265    }
1266
1267    pub fn input(&self) -> &HydroNode {
1268        match self {
1269            HydroRoot::ForEach { input, .. }
1270            | HydroRoot::SendExternal { input, .. }
1271            | HydroRoot::DestSink { input, .. }
1272            | HydroRoot::CycleSink { input, .. }
1273            | HydroRoot::EmbeddedOutput { input, .. } => input,
1274        }
1275    }
1276
1277    pub fn input_metadata(&self) -> &HydroIrMetadata {
1278        self.input().metadata()
1279    }
1280
1281    pub fn print_root(&self) -> String {
1282        match self {
1283            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1284            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1285            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1286            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1287            HydroRoot::EmbeddedOutput { ident, .. } => {
1288                format!("EmbeddedOutput({})", ident)
1289            }
1290        }
1291    }
1292
1293    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1294        match self {
1295            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1296                transform(f);
1297            }
1298            HydroRoot::SendExternal { .. }
1299            | HydroRoot::CycleSink { .. }
1300            | HydroRoot::EmbeddedOutput { .. } => {}
1301        }
1302    }
1303}
1304
1305#[cfg(feature = "build")]
1306pub fn emit<'a, D: Deploy<'a>>(
1307    ir: &mut Vec<HydroRoot>,
1308) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1309    let mut builders = SecondaryMap::new();
1310    let mut seen_tees = HashMap::new();
1311    let mut built_tees = HashMap::new();
1312    let mut next_stmt_id = 0;
1313    for leaf in ir {
1314        leaf.emit::<D>(
1315            &mut builders,
1316            &mut seen_tees,
1317            &mut built_tees,
1318            &mut next_stmt_id,
1319        );
1320    }
1321    builders
1322}
1323
1324#[cfg(feature = "build")]
1325pub fn traverse_dfir<'a, D: Deploy<'a>>(
1326    ir: &mut [HydroRoot],
1327    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1328    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1329) {
1330    let mut seen_tees = HashMap::new();
1331    let mut built_tees = HashMap::new();
1332    let mut next_stmt_id = 0;
1333    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1334    ir.iter_mut().for_each(|leaf| {
1335        leaf.emit_core::<D>(
1336            &mut callback,
1337            &mut seen_tees,
1338            &mut built_tees,
1339            &mut next_stmt_id,
1340        );
1341    });
1342}
1343
1344pub fn transform_bottom_up(
1345    ir: &mut [HydroRoot],
1346    transform_root: &mut impl FnMut(&mut HydroRoot),
1347    transform_node: &mut impl FnMut(&mut HydroNode),
1348    check_well_formed: bool,
1349) {
1350    let mut seen_tees = HashMap::new();
1351    ir.iter_mut().for_each(|leaf| {
1352        leaf.transform_bottom_up(
1353            transform_root,
1354            transform_node,
1355            &mut seen_tees,
1356            check_well_formed,
1357        );
1358    });
1359}
1360
1361pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1362    let mut seen_tees = HashMap::new();
1363    ir.iter()
1364        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1365        .collect()
1366}
1367
1368type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1369thread_local! {
1370    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1371}
1372
1373pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1374    PRINTED_TEES.with(|printed_tees| {
1375        let mut printed_tees_mut = printed_tees.borrow_mut();
1376        *printed_tees_mut = Some((0, HashMap::new()));
1377        drop(printed_tees_mut);
1378
1379        let ret = f();
1380
1381        let mut printed_tees_mut = printed_tees.borrow_mut();
1382        *printed_tees_mut = None;
1383
1384        ret
1385    })
1386}
1387
1388pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1389
1390impl TeeNode {
1391    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1392        Rc::as_ptr(&self.0)
1393    }
1394}
1395
1396impl Debug for TeeNode {
1397    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1398        PRINTED_TEES.with(|printed_tees| {
1399            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1400            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1401
1402            if let Some(printed_tees_mut) = printed_tees_mut {
1403                if let Some(existing) = printed_tees_mut
1404                    .1
1405                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1406                {
1407                    write!(f, "<tee {}>", existing)
1408                } else {
1409                    let next_id = printed_tees_mut.0;
1410                    printed_tees_mut.0 += 1;
1411                    printed_tees_mut
1412                        .1
1413                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1414                    drop(printed_tees_mut_borrow);
1415                    write!(f, "<tee {}>: ", next_id)?;
1416                    Debug::fmt(&self.0.borrow(), f)
1417                }
1418            } else {
1419                drop(printed_tees_mut_borrow);
1420                write!(f, "<tee>: ")?;
1421                Debug::fmt(&self.0.borrow(), f)
1422            }
1423        })
1424    }
1425}
1426
1427impl Hash for TeeNode {
1428    fn hash<H: Hasher>(&self, state: &mut H) {
1429        self.0.borrow_mut().hash(state);
1430    }
1431}
1432
1433#[derive(Clone, PartialEq, Eq, Debug)]
1434pub enum BoundKind {
1435    Unbounded,
1436    Bounded,
1437}
1438
1439#[derive(Clone, PartialEq, Eq, Debug)]
1440pub enum StreamOrder {
1441    NoOrder,
1442    TotalOrder,
1443}
1444
1445#[derive(Clone, PartialEq, Eq, Debug)]
1446pub enum StreamRetry {
1447    AtLeastOnce,
1448    ExactlyOnce,
1449}
1450
1451#[derive(Clone, PartialEq, Eq, Debug)]
1452pub enum KeyedSingletonBoundKind {
1453    Unbounded,
1454    BoundedValue,
1455    Bounded,
1456}
1457
1458#[derive(Clone, PartialEq, Eq, Debug)]
1459pub enum CollectionKind {
1460    Stream {
1461        bound: BoundKind,
1462        order: StreamOrder,
1463        retry: StreamRetry,
1464        element_type: DebugType,
1465    },
1466    Singleton {
1467        bound: BoundKind,
1468        element_type: DebugType,
1469    },
1470    Optional {
1471        bound: BoundKind,
1472        element_type: DebugType,
1473    },
1474    KeyedStream {
1475        bound: BoundKind,
1476        value_order: StreamOrder,
1477        value_retry: StreamRetry,
1478        key_type: DebugType,
1479        value_type: DebugType,
1480    },
1481    KeyedSingleton {
1482        bound: KeyedSingletonBoundKind,
1483        key_type: DebugType,
1484        value_type: DebugType,
1485    },
1486}
1487
1488impl CollectionKind {
1489    pub fn is_bounded(&self) -> bool {
1490        matches!(
1491            self,
1492            CollectionKind::Stream {
1493                bound: BoundKind::Bounded,
1494                ..
1495            } | CollectionKind::Singleton {
1496                bound: BoundKind::Bounded,
1497                ..
1498            } | CollectionKind::Optional {
1499                bound: BoundKind::Bounded,
1500                ..
1501            } | CollectionKind::KeyedStream {
1502                bound: BoundKind::Bounded,
1503                ..
1504            } | CollectionKind::KeyedSingleton {
1505                bound: KeyedSingletonBoundKind::Bounded,
1506                ..
1507            }
1508        )
1509    }
1510}
1511
1512#[derive(Clone)]
1513pub struct HydroIrMetadata {
1514    pub location_id: LocationId,
1515    pub collection_kind: CollectionKind,
1516    pub cardinality: Option<usize>,
1517    pub tag: Option<String>,
1518    pub op: HydroIrOpMetadata,
1519}
1520
1521// HydroIrMetadata shouldn't be used to hash or compare
1522impl Hash for HydroIrMetadata {
1523    fn hash<H: Hasher>(&self, _: &mut H) {}
1524}
1525
1526impl PartialEq for HydroIrMetadata {
1527    fn eq(&self, _: &Self) -> bool {
1528        true
1529    }
1530}
1531
1532impl Eq for HydroIrMetadata {}
1533
1534impl Debug for HydroIrMetadata {
1535    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1536        f.debug_struct("HydroIrMetadata")
1537            .field("location_id", &self.location_id)
1538            .field("collection_kind", &self.collection_kind)
1539            .finish()
1540    }
1541}
1542
1543/// Metadata that is specific to the operator itself, rather than its outputs.
1544/// This is available on _both_ inner nodes and roots.
1545#[derive(Clone)]
1546pub struct HydroIrOpMetadata {
1547    pub backtrace: Backtrace,
1548    pub cpu_usage: Option<f64>,
1549    pub network_recv_cpu_usage: Option<f64>,
1550    pub id: Option<usize>,
1551}
1552
1553impl HydroIrOpMetadata {
1554    #[expect(
1555        clippy::new_without_default,
1556        reason = "explicit calls to new ensure correct backtrace bounds"
1557    )]
1558    pub fn new() -> HydroIrOpMetadata {
1559        Self::new_with_skip(1)
1560    }
1561
1562    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1563        HydroIrOpMetadata {
1564            backtrace: Backtrace::get_backtrace(2 + skip_count),
1565            cpu_usage: None,
1566            network_recv_cpu_usage: None,
1567            id: None,
1568        }
1569    }
1570}
1571
1572impl Debug for HydroIrOpMetadata {
1573    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1574        f.debug_struct("HydroIrOpMetadata").finish()
1575    }
1576}
1577
1578impl Hash for HydroIrOpMetadata {
1579    fn hash<H: Hasher>(&self, _: &mut H) {}
1580}
1581
1582/// An intermediate node in a Hydro graph, which consumes data
1583/// from upstream nodes and emits data to downstream nodes.
1584#[derive(Debug, Hash)]
1585pub enum HydroNode {
1586    Placeholder,
1587
1588    /// Manually "casts" between two different collection kinds.
1589    ///
1590    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1591    /// correctness checks. In particular, the user must ensure that every possible
1592    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1593    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1594    /// collection. This ensures that the simulator does not miss any possible outputs.
1595    Cast {
1596        inner: Box<HydroNode>,
1597        metadata: HydroIrMetadata,
1598    },
1599
1600    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1601    /// interpretation of the input stream.
1602    ///
1603    /// In production, this simply passes through the input, but in simulation, this operator
1604    /// explicitly selects a randomized interpretation.
1605    ObserveNonDet {
1606        inner: Box<HydroNode>,
1607        trusted: bool, // if true, we do not need to simulate non-determinism
1608        metadata: HydroIrMetadata,
1609    },
1610
1611    Source {
1612        source: HydroSource,
1613        metadata: HydroIrMetadata,
1614    },
1615
1616    SingletonSource {
1617        value: DebugExpr,
1618        metadata: HydroIrMetadata,
1619    },
1620
1621    CycleSource {
1622        cycle_id: CycleId,
1623        metadata: HydroIrMetadata,
1624    },
1625
1626    Tee {
1627        inner: TeeNode,
1628        metadata: HydroIrMetadata,
1629    },
1630
1631    BeginAtomic {
1632        inner: Box<HydroNode>,
1633        metadata: HydroIrMetadata,
1634    },
1635
1636    EndAtomic {
1637        inner: Box<HydroNode>,
1638        metadata: HydroIrMetadata,
1639    },
1640
1641    Batch {
1642        inner: Box<HydroNode>,
1643        metadata: HydroIrMetadata,
1644    },
1645
1646    YieldConcat {
1647        inner: Box<HydroNode>,
1648        metadata: HydroIrMetadata,
1649    },
1650
1651    Chain {
1652        first: Box<HydroNode>,
1653        second: Box<HydroNode>,
1654        metadata: HydroIrMetadata,
1655    },
1656
1657    ChainFirst {
1658        first: Box<HydroNode>,
1659        second: Box<HydroNode>,
1660        metadata: HydroIrMetadata,
1661    },
1662
1663    CrossProduct {
1664        left: Box<HydroNode>,
1665        right: Box<HydroNode>,
1666        metadata: HydroIrMetadata,
1667    },
1668
1669    CrossSingleton {
1670        left: Box<HydroNode>,
1671        right: Box<HydroNode>,
1672        metadata: HydroIrMetadata,
1673    },
1674
1675    Join {
1676        left: Box<HydroNode>,
1677        right: Box<HydroNode>,
1678        metadata: HydroIrMetadata,
1679    },
1680
1681    Difference {
1682        pos: Box<HydroNode>,
1683        neg: Box<HydroNode>,
1684        metadata: HydroIrMetadata,
1685    },
1686
1687    AntiJoin {
1688        pos: Box<HydroNode>,
1689        neg: Box<HydroNode>,
1690        metadata: HydroIrMetadata,
1691    },
1692
1693    ResolveFutures {
1694        input: Box<HydroNode>,
1695        metadata: HydroIrMetadata,
1696    },
1697    ResolveFuturesOrdered {
1698        input: Box<HydroNode>,
1699        metadata: HydroIrMetadata,
1700    },
1701
1702    Map {
1703        f: DebugExpr,
1704        input: Box<HydroNode>,
1705        metadata: HydroIrMetadata,
1706    },
1707    FlatMap {
1708        f: DebugExpr,
1709        input: Box<HydroNode>,
1710        metadata: HydroIrMetadata,
1711    },
1712    Filter {
1713        f: DebugExpr,
1714        input: Box<HydroNode>,
1715        metadata: HydroIrMetadata,
1716    },
1717    FilterMap {
1718        f: DebugExpr,
1719        input: Box<HydroNode>,
1720        metadata: HydroIrMetadata,
1721    },
1722
1723    DeferTick {
1724        input: Box<HydroNode>,
1725        metadata: HydroIrMetadata,
1726    },
1727    Enumerate {
1728        input: Box<HydroNode>,
1729        metadata: HydroIrMetadata,
1730    },
1731    Inspect {
1732        f: DebugExpr,
1733        input: Box<HydroNode>,
1734        metadata: HydroIrMetadata,
1735    },
1736
1737    Unique {
1738        input: Box<HydroNode>,
1739        metadata: HydroIrMetadata,
1740    },
1741
1742    Sort {
1743        input: Box<HydroNode>,
1744        metadata: HydroIrMetadata,
1745    },
1746    Fold {
1747        init: DebugExpr,
1748        acc: DebugExpr,
1749        input: Box<HydroNode>,
1750        metadata: HydroIrMetadata,
1751    },
1752
1753    Scan {
1754        init: DebugExpr,
1755        acc: DebugExpr,
1756        input: Box<HydroNode>,
1757        metadata: HydroIrMetadata,
1758    },
1759    FoldKeyed {
1760        init: DebugExpr,
1761        acc: DebugExpr,
1762        input: Box<HydroNode>,
1763        metadata: HydroIrMetadata,
1764    },
1765
1766    Reduce {
1767        f: DebugExpr,
1768        input: Box<HydroNode>,
1769        metadata: HydroIrMetadata,
1770    },
1771    ReduceKeyed {
1772        f: DebugExpr,
1773        input: Box<HydroNode>,
1774        metadata: HydroIrMetadata,
1775    },
1776    ReduceKeyedWatermark {
1777        f: DebugExpr,
1778        input: Box<HydroNode>,
1779        watermark: Box<HydroNode>,
1780        metadata: HydroIrMetadata,
1781    },
1782
1783    Network {
1784        name: Option<String>,
1785        serialize_fn: Option<DebugExpr>,
1786        instantiate_fn: DebugInstantiate,
1787        deserialize_fn: Option<DebugExpr>,
1788        input: Box<HydroNode>,
1789        metadata: HydroIrMetadata,
1790    },
1791
1792    ExternalInput {
1793        from_external_key: LocationKey,
1794        from_port_id: ExternalPortId,
1795        from_many: bool,
1796        codec_type: DebugType,
1797        port_hint: NetworkHint,
1798        instantiate_fn: DebugInstantiate,
1799        deserialize_fn: Option<DebugExpr>,
1800        metadata: HydroIrMetadata,
1801    },
1802
1803    Counter {
1804        tag: String,
1805        duration: DebugExpr,
1806        prefix: String,
1807        input: Box<HydroNode>,
1808        metadata: HydroIrMetadata,
1809    },
1810}
1811
1812pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1813pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1814
1815impl HydroNode {
1816    pub fn transform_bottom_up(
1817        &mut self,
1818        transform: &mut impl FnMut(&mut HydroNode),
1819        seen_tees: &mut SeenTees,
1820        check_well_formed: bool,
1821    ) {
1822        self.transform_children(
1823            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1824            seen_tees,
1825        );
1826
1827        transform(self);
1828
1829        let self_location = self.metadata().location_id.root();
1830
1831        if check_well_formed {
1832            match &*self {
1833                HydroNode::Network { .. } => {}
1834                _ => {
1835                    self.input_metadata().iter().for_each(|i| {
1836                        if i.location_id.root() != self_location {
1837                            panic!(
1838                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1839                                i,
1840                                i.location_id.root(),
1841                                self,
1842                                self_location
1843                            )
1844                        }
1845                    });
1846                }
1847            }
1848        }
1849    }
1850
1851    #[inline(always)]
1852    pub fn transform_children(
1853        &mut self,
1854        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1855        seen_tees: &mut SeenTees,
1856    ) {
1857        match self {
1858            HydroNode::Placeholder => {
1859                panic!();
1860            }
1861
1862            HydroNode::Source { .. }
1863            | HydroNode::SingletonSource { .. }
1864            | HydroNode::CycleSource { .. }
1865            | HydroNode::ExternalInput { .. } => {}
1866
1867            HydroNode::Tee { inner, .. } => {
1868                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1869                    *inner = TeeNode(transformed.clone());
1870                } else {
1871                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1872                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1873                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1874                    transform(&mut orig, seen_tees);
1875                    *transformed_cell.borrow_mut() = orig;
1876                    *inner = TeeNode(transformed_cell);
1877                }
1878            }
1879
1880            HydroNode::Cast { inner, .. }
1881            | HydroNode::ObserveNonDet { inner, .. }
1882            | HydroNode::BeginAtomic { inner, .. }
1883            | HydroNode::EndAtomic { inner, .. }
1884            | HydroNode::Batch { inner, .. }
1885            | HydroNode::YieldConcat { inner, .. } => {
1886                transform(inner.as_mut(), seen_tees);
1887            }
1888
1889            HydroNode::Chain { first, second, .. } => {
1890                transform(first.as_mut(), seen_tees);
1891                transform(second.as_mut(), seen_tees);
1892            }
1893
1894            HydroNode::ChainFirst { first, second, .. } => {
1895                transform(first.as_mut(), seen_tees);
1896                transform(second.as_mut(), seen_tees);
1897            }
1898
1899            HydroNode::CrossSingleton { left, right, .. }
1900            | HydroNode::CrossProduct { left, right, .. }
1901            | HydroNode::Join { left, right, .. } => {
1902                transform(left.as_mut(), seen_tees);
1903                transform(right.as_mut(), seen_tees);
1904            }
1905
1906            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1907                transform(pos.as_mut(), seen_tees);
1908                transform(neg.as_mut(), seen_tees);
1909            }
1910
1911            HydroNode::ReduceKeyedWatermark {
1912                input, watermark, ..
1913            } => {
1914                transform(input.as_mut(), seen_tees);
1915                transform(watermark.as_mut(), seen_tees);
1916            }
1917
1918            HydroNode::Map { input, .. }
1919            | HydroNode::ResolveFutures { input, .. }
1920            | HydroNode::ResolveFuturesOrdered { input, .. }
1921            | HydroNode::FlatMap { input, .. }
1922            | HydroNode::Filter { input, .. }
1923            | HydroNode::FilterMap { input, .. }
1924            | HydroNode::Sort { input, .. }
1925            | HydroNode::DeferTick { input, .. }
1926            | HydroNode::Enumerate { input, .. }
1927            | HydroNode::Inspect { input, .. }
1928            | HydroNode::Unique { input, .. }
1929            | HydroNode::Network { input, .. }
1930            | HydroNode::Fold { input, .. }
1931            | HydroNode::Scan { input, .. }
1932            | HydroNode::FoldKeyed { input, .. }
1933            | HydroNode::Reduce { input, .. }
1934            | HydroNode::ReduceKeyed { input, .. }
1935            | HydroNode::Counter { input, .. } => {
1936                transform(input.as_mut(), seen_tees);
1937            }
1938        }
1939    }
1940
1941    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1942        match self {
1943            HydroNode::Placeholder => HydroNode::Placeholder,
1944            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1945                inner: Box::new(inner.deep_clone(seen_tees)),
1946                metadata: metadata.clone(),
1947            },
1948            HydroNode::ObserveNonDet {
1949                inner,
1950                trusted,
1951                metadata,
1952            } => HydroNode::ObserveNonDet {
1953                inner: Box::new(inner.deep_clone(seen_tees)),
1954                trusted: *trusted,
1955                metadata: metadata.clone(),
1956            },
1957            HydroNode::Source { source, metadata } => HydroNode::Source {
1958                source: source.clone(),
1959                metadata: metadata.clone(),
1960            },
1961            HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1962                value: value.clone(),
1963                metadata: metadata.clone(),
1964            },
1965            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
1966                cycle_id: *cycle_id,
1967                metadata: metadata.clone(),
1968            },
1969            HydroNode::Tee { inner, metadata } => {
1970                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1971                    HydroNode::Tee {
1972                        inner: TeeNode(transformed.clone()),
1973                        metadata: metadata.clone(),
1974                    }
1975                } else {
1976                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1977                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
1978                    let cloned = inner.0.borrow().deep_clone(seen_tees);
1979                    *new_rc.borrow_mut() = cloned;
1980                    HydroNode::Tee {
1981                        inner: TeeNode(new_rc),
1982                        metadata: metadata.clone(),
1983                    }
1984                }
1985            }
1986            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1987                inner: Box::new(inner.deep_clone(seen_tees)),
1988                metadata: metadata.clone(),
1989            },
1990            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1991                inner: Box::new(inner.deep_clone(seen_tees)),
1992                metadata: metadata.clone(),
1993            },
1994            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1995                inner: Box::new(inner.deep_clone(seen_tees)),
1996                metadata: metadata.clone(),
1997            },
1998            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1999                inner: Box::new(inner.deep_clone(seen_tees)),
2000                metadata: metadata.clone(),
2001            },
2002            HydroNode::Chain {
2003                first,
2004                second,
2005                metadata,
2006            } => HydroNode::Chain {
2007                first: Box::new(first.deep_clone(seen_tees)),
2008                second: Box::new(second.deep_clone(seen_tees)),
2009                metadata: metadata.clone(),
2010            },
2011            HydroNode::ChainFirst {
2012                first,
2013                second,
2014                metadata,
2015            } => HydroNode::ChainFirst {
2016                first: Box::new(first.deep_clone(seen_tees)),
2017                second: Box::new(second.deep_clone(seen_tees)),
2018                metadata: metadata.clone(),
2019            },
2020            HydroNode::CrossProduct {
2021                left,
2022                right,
2023                metadata,
2024            } => HydroNode::CrossProduct {
2025                left: Box::new(left.deep_clone(seen_tees)),
2026                right: Box::new(right.deep_clone(seen_tees)),
2027                metadata: metadata.clone(),
2028            },
2029            HydroNode::CrossSingleton {
2030                left,
2031                right,
2032                metadata,
2033            } => HydroNode::CrossSingleton {
2034                left: Box::new(left.deep_clone(seen_tees)),
2035                right: Box::new(right.deep_clone(seen_tees)),
2036                metadata: metadata.clone(),
2037            },
2038            HydroNode::Join {
2039                left,
2040                right,
2041                metadata,
2042            } => HydroNode::Join {
2043                left: Box::new(left.deep_clone(seen_tees)),
2044                right: Box::new(right.deep_clone(seen_tees)),
2045                metadata: metadata.clone(),
2046            },
2047            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2048                pos: Box::new(pos.deep_clone(seen_tees)),
2049                neg: Box::new(neg.deep_clone(seen_tees)),
2050                metadata: metadata.clone(),
2051            },
2052            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2053                pos: Box::new(pos.deep_clone(seen_tees)),
2054                neg: Box::new(neg.deep_clone(seen_tees)),
2055                metadata: metadata.clone(),
2056            },
2057            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2058                input: Box::new(input.deep_clone(seen_tees)),
2059                metadata: metadata.clone(),
2060            },
2061            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2062                HydroNode::ResolveFuturesOrdered {
2063                    input: Box::new(input.deep_clone(seen_tees)),
2064                    metadata: metadata.clone(),
2065                }
2066            }
2067            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2068                f: f.clone(),
2069                input: Box::new(input.deep_clone(seen_tees)),
2070                metadata: metadata.clone(),
2071            },
2072            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2073                f: f.clone(),
2074                input: Box::new(input.deep_clone(seen_tees)),
2075                metadata: metadata.clone(),
2076            },
2077            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2078                f: f.clone(),
2079                input: Box::new(input.deep_clone(seen_tees)),
2080                metadata: metadata.clone(),
2081            },
2082            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2083                f: f.clone(),
2084                input: Box::new(input.deep_clone(seen_tees)),
2085                metadata: metadata.clone(),
2086            },
2087            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2088                input: Box::new(input.deep_clone(seen_tees)),
2089                metadata: metadata.clone(),
2090            },
2091            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2092                input: Box::new(input.deep_clone(seen_tees)),
2093                metadata: metadata.clone(),
2094            },
2095            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2096                f: f.clone(),
2097                input: Box::new(input.deep_clone(seen_tees)),
2098                metadata: metadata.clone(),
2099            },
2100            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2101                input: Box::new(input.deep_clone(seen_tees)),
2102                metadata: metadata.clone(),
2103            },
2104            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2105                input: Box::new(input.deep_clone(seen_tees)),
2106                metadata: metadata.clone(),
2107            },
2108            HydroNode::Fold {
2109                init,
2110                acc,
2111                input,
2112                metadata,
2113            } => HydroNode::Fold {
2114                init: init.clone(),
2115                acc: acc.clone(),
2116                input: Box::new(input.deep_clone(seen_tees)),
2117                metadata: metadata.clone(),
2118            },
2119            HydroNode::Scan {
2120                init,
2121                acc,
2122                input,
2123                metadata,
2124            } => HydroNode::Scan {
2125                init: init.clone(),
2126                acc: acc.clone(),
2127                input: Box::new(input.deep_clone(seen_tees)),
2128                metadata: metadata.clone(),
2129            },
2130            HydroNode::FoldKeyed {
2131                init,
2132                acc,
2133                input,
2134                metadata,
2135            } => HydroNode::FoldKeyed {
2136                init: init.clone(),
2137                acc: acc.clone(),
2138                input: Box::new(input.deep_clone(seen_tees)),
2139                metadata: metadata.clone(),
2140            },
2141            HydroNode::ReduceKeyedWatermark {
2142                f,
2143                input,
2144                watermark,
2145                metadata,
2146            } => HydroNode::ReduceKeyedWatermark {
2147                f: f.clone(),
2148                input: Box::new(input.deep_clone(seen_tees)),
2149                watermark: Box::new(watermark.deep_clone(seen_tees)),
2150                metadata: metadata.clone(),
2151            },
2152            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2153                f: f.clone(),
2154                input: Box::new(input.deep_clone(seen_tees)),
2155                metadata: metadata.clone(),
2156            },
2157            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2158                f: f.clone(),
2159                input: Box::new(input.deep_clone(seen_tees)),
2160                metadata: metadata.clone(),
2161            },
2162            HydroNode::Network {
2163                name,
2164                serialize_fn,
2165                instantiate_fn,
2166                deserialize_fn,
2167                input,
2168                metadata,
2169            } => HydroNode::Network {
2170                name: name.clone(),
2171                serialize_fn: serialize_fn.clone(),
2172                instantiate_fn: instantiate_fn.clone(),
2173                deserialize_fn: deserialize_fn.clone(),
2174                input: Box::new(input.deep_clone(seen_tees)),
2175                metadata: metadata.clone(),
2176            },
2177            HydroNode::ExternalInput {
2178                from_external_key,
2179                from_port_id,
2180                from_many,
2181                codec_type,
2182                port_hint,
2183                instantiate_fn,
2184                deserialize_fn,
2185                metadata,
2186            } => HydroNode::ExternalInput {
2187                from_external_key: *from_external_key,
2188                from_port_id: *from_port_id,
2189                from_many: *from_many,
2190                codec_type: codec_type.clone(),
2191                port_hint: *port_hint,
2192                instantiate_fn: instantiate_fn.clone(),
2193                deserialize_fn: deserialize_fn.clone(),
2194                metadata: metadata.clone(),
2195            },
2196            HydroNode::Counter {
2197                tag,
2198                duration,
2199                prefix,
2200                input,
2201                metadata,
2202            } => HydroNode::Counter {
2203                tag: tag.clone(),
2204                duration: duration.clone(),
2205                prefix: prefix.clone(),
2206                input: Box::new(input.deep_clone(seen_tees)),
2207                metadata: metadata.clone(),
2208            },
2209        }
2210    }
2211
2212    #[cfg(feature = "build")]
2213    pub fn emit_core<'a, D: Deploy<'a>>(
2214        &mut self,
2215        builders_or_callback: &mut BuildersOrCallback<
2216            impl FnMut(&mut HydroRoot, &mut usize),
2217            impl FnMut(&mut HydroNode, &mut usize),
2218        >,
2219        seen_tees: &mut SeenTees,
2220        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2221        next_stmt_id: &mut usize,
2222    ) -> syn::Ident {
2223        let mut ident_stack: Vec<syn::Ident> = Vec::new();
2224
2225        self.transform_bottom_up(
2226            &mut |node: &mut HydroNode| {
2227                let out_location = node.metadata().location_id.clone();
2228                match node {
2229                    HydroNode::Placeholder => {
2230                        panic!()
2231                    }
2232
2233                    HydroNode::Cast { .. } => {
2234                        // Cast passes through the input ident unchanged
2235                        // The input ident is already on the stack from processing the child
2236                        match builders_or_callback {
2237                            BuildersOrCallback::Builders(_) => {}
2238                            BuildersOrCallback::Callback(_, node_callback) => {
2239                                node_callback(node, next_stmt_id);
2240                            }
2241                        }
2242
2243                        *next_stmt_id += 1;
2244                        // input_ident stays on stack as output
2245                    }
2246
2247                    HydroNode::ObserveNonDet {
2248                        inner,
2249                        trusted,
2250                        metadata,
2251                        ..
2252                    } => {
2253                        let inner_ident = ident_stack.pop().unwrap();
2254
2255                        let observe_ident =
2256                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2257
2258                        match builders_or_callback {
2259                            BuildersOrCallback::Builders(graph_builders) => {
2260                                graph_builders.observe_nondet(
2261                                    *trusted,
2262                                    &inner.metadata().location_id,
2263                                    inner_ident,
2264                                    &inner.metadata().collection_kind,
2265                                    &observe_ident,
2266                                    &metadata.collection_kind,
2267                                    &metadata.op,
2268                                );
2269                            }
2270                            BuildersOrCallback::Callback(_, node_callback) => {
2271                                node_callback(node, next_stmt_id);
2272                            }
2273                        }
2274
2275                        *next_stmt_id += 1;
2276
2277                        ident_stack.push(observe_ident);
2278                    }
2279
2280                    HydroNode::Batch {
2281                        inner, metadata, ..
2282                    } => {
2283                        let inner_ident = ident_stack.pop().unwrap();
2284
2285                        let batch_ident =
2286                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2287
2288                        match builders_or_callback {
2289                            BuildersOrCallback::Builders(graph_builders) => {
2290                                graph_builders.batch(
2291                                    inner_ident,
2292                                    &inner.metadata().location_id,
2293                                    &inner.metadata().collection_kind,
2294                                    &batch_ident,
2295                                    &out_location,
2296                                    &metadata.op,
2297                                );
2298                            }
2299                            BuildersOrCallback::Callback(_, node_callback) => {
2300                                node_callback(node, next_stmt_id);
2301                            }
2302                        }
2303
2304                        *next_stmt_id += 1;
2305
2306                        ident_stack.push(batch_ident);
2307                    }
2308
2309                    HydroNode::YieldConcat { inner, .. } => {
2310                        let inner_ident = ident_stack.pop().unwrap();
2311
2312                        let yield_ident =
2313                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2314
2315                        match builders_or_callback {
2316                            BuildersOrCallback::Builders(graph_builders) => {
2317                                graph_builders.yield_from_tick(
2318                                    inner_ident,
2319                                    &inner.metadata().location_id,
2320                                    &inner.metadata().collection_kind,
2321                                    &yield_ident,
2322                                    &out_location,
2323                                );
2324                            }
2325                            BuildersOrCallback::Callback(_, node_callback) => {
2326                                node_callback(node, next_stmt_id);
2327                            }
2328                        }
2329
2330                        *next_stmt_id += 1;
2331
2332                        ident_stack.push(yield_ident);
2333                    }
2334
2335                    HydroNode::BeginAtomic { inner, metadata } => {
2336                        let inner_ident = ident_stack.pop().unwrap();
2337
2338                        let begin_ident =
2339                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2340
2341                        match builders_or_callback {
2342                            BuildersOrCallback::Builders(graph_builders) => {
2343                                graph_builders.begin_atomic(
2344                                    inner_ident,
2345                                    &inner.metadata().location_id,
2346                                    &inner.metadata().collection_kind,
2347                                    &begin_ident,
2348                                    &out_location,
2349                                    &metadata.op,
2350                                );
2351                            }
2352                            BuildersOrCallback::Callback(_, node_callback) => {
2353                                node_callback(node, next_stmt_id);
2354                            }
2355                        }
2356
2357                        *next_stmt_id += 1;
2358
2359                        ident_stack.push(begin_ident);
2360                    }
2361
2362                    HydroNode::EndAtomic { inner, .. } => {
2363                        let inner_ident = ident_stack.pop().unwrap();
2364
2365                        let end_ident =
2366                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2367
2368                        match builders_or_callback {
2369                            BuildersOrCallback::Builders(graph_builders) => {
2370                                graph_builders.end_atomic(
2371                                    inner_ident,
2372                                    &inner.metadata().location_id,
2373                                    &inner.metadata().collection_kind,
2374                                    &end_ident,
2375                                );
2376                            }
2377                            BuildersOrCallback::Callback(_, node_callback) => {
2378                                node_callback(node, next_stmt_id);
2379                            }
2380                        }
2381
2382                        *next_stmt_id += 1;
2383
2384                        ident_stack.push(end_ident);
2385                    }
2386
2387                    HydroNode::Source {
2388                        source, metadata, ..
2389                    } => {
2390                        if let HydroSource::ExternalNetwork() = source {
2391                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2392                        } else {
2393                            let source_ident =
2394                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2395
2396                            let source_stmt = match source {
2397                                HydroSource::Stream(expr) => {
2398                                    debug_assert!(metadata.location_id.is_top_level());
2399                                    parse_quote! {
2400                                        #source_ident = source_stream(#expr);
2401                                    }
2402                                }
2403
2404                                HydroSource::ExternalNetwork() => {
2405                                    unreachable!()
2406                                }
2407
2408                                HydroSource::Iter(expr) => {
2409                                    if metadata.location_id.is_top_level() {
2410                                        parse_quote! {
2411                                            #source_ident = source_iter(#expr);
2412                                        }
2413                                    } else {
2414                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2415                                        parse_quote! {
2416                                            #source_ident = source_iter(#expr) -> persist::<'static>();
2417                                        }
2418                                    }
2419                                }
2420
2421                                HydroSource::Spin() => {
2422                                    debug_assert!(metadata.location_id.is_top_level());
2423                                    parse_quote! {
2424                                        #source_ident = spin();
2425                                    }
2426                                }
2427
2428                                HydroSource::ClusterMembers(location_id) => {
2429                                    debug_assert!(metadata.location_id.is_top_level());
2430
2431                                    let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
2432                                        D::cluster_membership_stream(location_id),
2433                                        &(),
2434                                    );
2435
2436                                    parse_quote! {
2437                                        #source_ident = source_stream(#expr);
2438                                    }
2439                                }
2440
2441                                HydroSource::Embedded(ident) => {
2442                                    parse_quote! {
2443                                        #source_ident = source_stream(#ident);
2444                                    }
2445                                }
2446                            };
2447
2448                            match builders_or_callback {
2449                                BuildersOrCallback::Builders(graph_builders) => {
2450                                    let builder = graph_builders.get_dfir_mut(&out_location);
2451                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2452                                }
2453                                BuildersOrCallback::Callback(_, node_callback) => {
2454                                    node_callback(node, next_stmt_id);
2455                                }
2456                            }
2457
2458                            *next_stmt_id += 1;
2459
2460                            ident_stack.push(source_ident);
2461                        }
2462                    }
2463
2464                    HydroNode::SingletonSource { value, metadata } => {
2465                        let source_ident =
2466                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2467
2468                        match builders_or_callback {
2469                            BuildersOrCallback::Builders(graph_builders) => {
2470                                let builder = graph_builders.get_dfir_mut(&out_location);
2471
2472                                if metadata.location_id.is_top_level()
2473                                    && metadata.collection_kind.is_bounded()
2474                                {
2475                                    builder.add_dfir(
2476                                        parse_quote! {
2477                                            #source_ident = source_iter([#value]);
2478                                        },
2479                                        None,
2480                                        Some(&next_stmt_id.to_string()),
2481                                    );
2482                                } else {
2483                                    builder.add_dfir(
2484                                        parse_quote! {
2485                                            #source_ident = source_iter([#value]) -> persist::<'static>();
2486                                        },
2487                                        None,
2488                                        Some(&next_stmt_id.to_string()),
2489                                    );
2490                                }
2491                            }
2492                            BuildersOrCallback::Callback(_, node_callback) => {
2493                                node_callback(node, next_stmt_id);
2494                            }
2495                        }
2496
2497                        *next_stmt_id += 1;
2498
2499                        ident_stack.push(source_ident);
2500                    }
2501
2502                    HydroNode::CycleSource { cycle_id, .. } => {
2503                        let ident = cycle_id.as_ident();
2504
2505                        match builders_or_callback {
2506                            BuildersOrCallback::Builders(_) => {}
2507                            BuildersOrCallback::Callback(_, node_callback) => {
2508                                node_callback(node, next_stmt_id);
2509                            }
2510                        }
2511
2512                        // consume a stmt id even though we did not emit anything so that we can instrument this
2513                        *next_stmt_id += 1;
2514
2515                        ident_stack.push(ident);
2516                    }
2517
2518                    HydroNode::Tee { inner, .. } => {
2519                        let ret_ident = if let Some(teed_from) =
2520                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2521                        {
2522                            match builders_or_callback {
2523                                BuildersOrCallback::Builders(_) => {}
2524                                BuildersOrCallback::Callback(_, node_callback) => {
2525                                    node_callback(node, next_stmt_id);
2526                                }
2527                            }
2528
2529                            teed_from.clone()
2530                        } else {
2531                            // The inner node was already processed by transform_bottom_up,
2532                            // so its ident is on the stack
2533                            let inner_ident = ident_stack.pop().unwrap();
2534
2535                            let tee_ident =
2536                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2537
2538                            built_tees.insert(
2539                                inner.0.as_ref() as *const RefCell<HydroNode>,
2540                                tee_ident.clone(),
2541                            );
2542
2543                            match builders_or_callback {
2544                                BuildersOrCallback::Builders(graph_builders) => {
2545                                    let builder = graph_builders.get_dfir_mut(&out_location);
2546                                    builder.add_dfir(
2547                                        parse_quote! {
2548                                            #tee_ident = #inner_ident -> tee();
2549                                        },
2550                                        None,
2551                                        Some(&next_stmt_id.to_string()),
2552                                    );
2553                                }
2554                                BuildersOrCallback::Callback(_, node_callback) => {
2555                                    node_callback(node, next_stmt_id);
2556                                }
2557                            }
2558
2559                            tee_ident
2560                        };
2561
2562                        // we consume a stmt id regardless of if we emit the tee() operator,
2563                        // so that during rewrites we touch all recipients of the tee()
2564
2565                        *next_stmt_id += 1;
2566                        ident_stack.push(ret_ident);
2567                    }
2568
2569                    HydroNode::Chain { .. } => {
2570                        // Children are processed left-to-right, so second is on top
2571                        let second_ident = ident_stack.pop().unwrap();
2572                        let first_ident = ident_stack.pop().unwrap();
2573
2574                        let chain_ident =
2575                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2576
2577                        match builders_or_callback {
2578                            BuildersOrCallback::Builders(graph_builders) => {
2579                                let builder = graph_builders.get_dfir_mut(&out_location);
2580                                builder.add_dfir(
2581                                    parse_quote! {
2582                                        #chain_ident = chain();
2583                                        #first_ident -> [0]#chain_ident;
2584                                        #second_ident -> [1]#chain_ident;
2585                                    },
2586                                    None,
2587                                    Some(&next_stmt_id.to_string()),
2588                                );
2589                            }
2590                            BuildersOrCallback::Callback(_, node_callback) => {
2591                                node_callback(node, next_stmt_id);
2592                            }
2593                        }
2594
2595                        *next_stmt_id += 1;
2596
2597                        ident_stack.push(chain_ident);
2598                    }
2599
2600                    HydroNode::ChainFirst { .. } => {
2601                        let second_ident = ident_stack.pop().unwrap();
2602                        let first_ident = ident_stack.pop().unwrap();
2603
2604                        let chain_ident =
2605                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2606
2607                        match builders_or_callback {
2608                            BuildersOrCallback::Builders(graph_builders) => {
2609                                let builder = graph_builders.get_dfir_mut(&out_location);
2610                                builder.add_dfir(
2611                                    parse_quote! {
2612                                        #chain_ident = chain_first_n(1);
2613                                        #first_ident -> [0]#chain_ident;
2614                                        #second_ident -> [1]#chain_ident;
2615                                    },
2616                                    None,
2617                                    Some(&next_stmt_id.to_string()),
2618                                );
2619                            }
2620                            BuildersOrCallback::Callback(_, node_callback) => {
2621                                node_callback(node, next_stmt_id);
2622                            }
2623                        }
2624
2625                        *next_stmt_id += 1;
2626
2627                        ident_stack.push(chain_ident);
2628                    }
2629
2630                    HydroNode::CrossSingleton { right, .. } => {
2631                        let right_ident = ident_stack.pop().unwrap();
2632                        let left_ident = ident_stack.pop().unwrap();
2633
2634                        let cross_ident =
2635                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2636
2637                        match builders_or_callback {
2638                            BuildersOrCallback::Builders(graph_builders) => {
2639                                let builder = graph_builders.get_dfir_mut(&out_location);
2640
2641                                if right.metadata().location_id.is_top_level()
2642                                    && right.metadata().collection_kind.is_bounded()
2643                                {
2644                                    builder.add_dfir(
2645                                        parse_quote! {
2646                                            #cross_ident = cross_singleton();
2647                                            #left_ident -> [input]#cross_ident;
2648                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
2649                                        },
2650                                        None,
2651                                        Some(&next_stmt_id.to_string()),
2652                                    );
2653                                } else {
2654                                    builder.add_dfir(
2655                                        parse_quote! {
2656                                            #cross_ident = cross_singleton();
2657                                            #left_ident -> [input]#cross_ident;
2658                                            #right_ident -> [single]#cross_ident;
2659                                        },
2660                                        None,
2661                                        Some(&next_stmt_id.to_string()),
2662                                    );
2663                                }
2664                            }
2665                            BuildersOrCallback::Callback(_, node_callback) => {
2666                                node_callback(node, next_stmt_id);
2667                            }
2668                        }
2669
2670                        *next_stmt_id += 1;
2671
2672                        ident_stack.push(cross_ident);
2673                    }
2674
2675                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2676                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2677                            parse_quote!(cross_join_multiset)
2678                        } else {
2679                            parse_quote!(join_multiset)
2680                        };
2681
2682                        let (HydroNode::CrossProduct { left, right, .. }
2683                        | HydroNode::Join { left, right, .. }) = node
2684                        else {
2685                            unreachable!()
2686                        };
2687
2688                        let is_top_level = left.metadata().location_id.is_top_level()
2689                            && right.metadata().location_id.is_top_level();
2690                        let left_lifetime = if left.metadata().location_id.is_top_level() {
2691                            quote!('static)
2692                        } else {
2693                            quote!('tick)
2694                        };
2695
2696                        let right_lifetime = if right.metadata().location_id.is_top_level() {
2697                            quote!('static)
2698                        } else {
2699                            quote!('tick)
2700                        };
2701
2702                        let right_ident = ident_stack.pop().unwrap();
2703                        let left_ident = ident_stack.pop().unwrap();
2704
2705                        let stream_ident =
2706                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2707
2708                        match builders_or_callback {
2709                            BuildersOrCallback::Builders(graph_builders) => {
2710                                let builder = graph_builders.get_dfir_mut(&out_location);
2711                                builder.add_dfir(
2712                                    if is_top_level {
2713                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
2714                                        // a multiset_delta() to negate the replay behavior
2715                                        parse_quote! {
2716                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2717                                            #left_ident -> [0]#stream_ident;
2718                                            #right_ident -> [1]#stream_ident;
2719                                        }
2720                                    } else {
2721                                        parse_quote! {
2722                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2723                                            #left_ident -> [0]#stream_ident;
2724                                            #right_ident -> [1]#stream_ident;
2725                                        }
2726                                    }
2727                                    ,
2728                                    None,
2729                                    Some(&next_stmt_id.to_string()),
2730                                );
2731                            }
2732                            BuildersOrCallback::Callback(_, node_callback) => {
2733                                node_callback(node, next_stmt_id);
2734                            }
2735                        }
2736
2737                        *next_stmt_id += 1;
2738
2739                        ident_stack.push(stream_ident);
2740                    }
2741
2742                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2743                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2744                            parse_quote!(difference)
2745                        } else {
2746                            parse_quote!(anti_join)
2747                        };
2748
2749                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2750                            node
2751                        else {
2752                            unreachable!()
2753                        };
2754
2755                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2756                            quote!('static)
2757                        } else {
2758                            quote!('tick)
2759                        };
2760
2761                        let neg_ident = ident_stack.pop().unwrap();
2762                        let pos_ident = ident_stack.pop().unwrap();
2763
2764                        let stream_ident =
2765                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2766
2767                        match builders_or_callback {
2768                            BuildersOrCallback::Builders(graph_builders) => {
2769                                let builder = graph_builders.get_dfir_mut(&out_location);
2770                                builder.add_dfir(
2771                                    parse_quote! {
2772                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
2773                                        #pos_ident -> [pos]#stream_ident;
2774                                        #neg_ident -> [neg]#stream_ident;
2775                                    },
2776                                    None,
2777                                    Some(&next_stmt_id.to_string()),
2778                                );
2779                            }
2780                            BuildersOrCallback::Callback(_, node_callback) => {
2781                                node_callback(node, next_stmt_id);
2782                            }
2783                        }
2784
2785                        *next_stmt_id += 1;
2786
2787                        ident_stack.push(stream_ident);
2788                    }
2789
2790                    HydroNode::ResolveFutures { .. } => {
2791                        let input_ident = ident_stack.pop().unwrap();
2792
2793                        let futures_ident =
2794                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2795
2796                        match builders_or_callback {
2797                            BuildersOrCallback::Builders(graph_builders) => {
2798                                let builder = graph_builders.get_dfir_mut(&out_location);
2799                                builder.add_dfir(
2800                                    parse_quote! {
2801                                        #futures_ident = #input_ident -> resolve_futures();
2802                                    },
2803                                    None,
2804                                    Some(&next_stmt_id.to_string()),
2805                                );
2806                            }
2807                            BuildersOrCallback::Callback(_, node_callback) => {
2808                                node_callback(node, next_stmt_id);
2809                            }
2810                        }
2811
2812                        *next_stmt_id += 1;
2813
2814                        ident_stack.push(futures_ident);
2815                    }
2816
2817                    HydroNode::ResolveFuturesOrdered { .. } => {
2818                        let input_ident = ident_stack.pop().unwrap();
2819
2820                        let futures_ident =
2821                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2822
2823                        match builders_or_callback {
2824                            BuildersOrCallback::Builders(graph_builders) => {
2825                                let builder = graph_builders.get_dfir_mut(&out_location);
2826                                builder.add_dfir(
2827                                    parse_quote! {
2828                                        #futures_ident = #input_ident -> resolve_futures_ordered();
2829                                    },
2830                                    None,
2831                                    Some(&next_stmt_id.to_string()),
2832                                );
2833                            }
2834                            BuildersOrCallback::Callback(_, node_callback) => {
2835                                node_callback(node, next_stmt_id);
2836                            }
2837                        }
2838
2839                        *next_stmt_id += 1;
2840
2841                        ident_stack.push(futures_ident);
2842                    }
2843
2844                    HydroNode::Map { f, .. } => {
2845                        let input_ident = ident_stack.pop().unwrap();
2846
2847                        let map_ident =
2848                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2849
2850                        match builders_or_callback {
2851                            BuildersOrCallback::Builders(graph_builders) => {
2852                                let builder = graph_builders.get_dfir_mut(&out_location);
2853                                builder.add_dfir(
2854                                    parse_quote! {
2855                                        #map_ident = #input_ident -> map(#f);
2856                                    },
2857                                    None,
2858                                    Some(&next_stmt_id.to_string()),
2859                                );
2860                            }
2861                            BuildersOrCallback::Callback(_, node_callback) => {
2862                                node_callback(node, next_stmt_id);
2863                            }
2864                        }
2865
2866                        *next_stmt_id += 1;
2867
2868                        ident_stack.push(map_ident);
2869                    }
2870
2871                    HydroNode::FlatMap { f, .. } => {
2872                        let input_ident = ident_stack.pop().unwrap();
2873
2874                        let flat_map_ident =
2875                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2876
2877                        match builders_or_callback {
2878                            BuildersOrCallback::Builders(graph_builders) => {
2879                                let builder = graph_builders.get_dfir_mut(&out_location);
2880                                builder.add_dfir(
2881                                    parse_quote! {
2882                                        #flat_map_ident = #input_ident -> flat_map(#f);
2883                                    },
2884                                    None,
2885                                    Some(&next_stmt_id.to_string()),
2886                                );
2887                            }
2888                            BuildersOrCallback::Callback(_, node_callback) => {
2889                                node_callback(node, next_stmt_id);
2890                            }
2891                        }
2892
2893                        *next_stmt_id += 1;
2894
2895                        ident_stack.push(flat_map_ident);
2896                    }
2897
2898                    HydroNode::Filter { f, .. } => {
2899                        let input_ident = ident_stack.pop().unwrap();
2900
2901                        let filter_ident =
2902                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2903
2904                        match builders_or_callback {
2905                            BuildersOrCallback::Builders(graph_builders) => {
2906                                let builder = graph_builders.get_dfir_mut(&out_location);
2907                                builder.add_dfir(
2908                                    parse_quote! {
2909                                        #filter_ident = #input_ident -> filter(#f);
2910                                    },
2911                                    None,
2912                                    Some(&next_stmt_id.to_string()),
2913                                );
2914                            }
2915                            BuildersOrCallback::Callback(_, node_callback) => {
2916                                node_callback(node, next_stmt_id);
2917                            }
2918                        }
2919
2920                        *next_stmt_id += 1;
2921
2922                        ident_stack.push(filter_ident);
2923                    }
2924
2925                    HydroNode::FilterMap { f, .. } => {
2926                        let input_ident = ident_stack.pop().unwrap();
2927
2928                        let filter_map_ident =
2929                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2930
2931                        match builders_or_callback {
2932                            BuildersOrCallback::Builders(graph_builders) => {
2933                                let builder = graph_builders.get_dfir_mut(&out_location);
2934                                builder.add_dfir(
2935                                    parse_quote! {
2936                                        #filter_map_ident = #input_ident -> filter_map(#f);
2937                                    },
2938                                    None,
2939                                    Some(&next_stmt_id.to_string()),
2940                                );
2941                            }
2942                            BuildersOrCallback::Callback(_, node_callback) => {
2943                                node_callback(node, next_stmt_id);
2944                            }
2945                        }
2946
2947                        *next_stmt_id += 1;
2948
2949                        ident_stack.push(filter_map_ident);
2950                    }
2951
2952                    HydroNode::Sort { .. } => {
2953                        let input_ident = ident_stack.pop().unwrap();
2954
2955                        let sort_ident =
2956                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2957
2958                        match builders_or_callback {
2959                            BuildersOrCallback::Builders(graph_builders) => {
2960                                let builder = graph_builders.get_dfir_mut(&out_location);
2961                                builder.add_dfir(
2962                                    parse_quote! {
2963                                        #sort_ident = #input_ident -> sort();
2964                                    },
2965                                    None,
2966                                    Some(&next_stmt_id.to_string()),
2967                                );
2968                            }
2969                            BuildersOrCallback::Callback(_, node_callback) => {
2970                                node_callback(node, next_stmt_id);
2971                            }
2972                        }
2973
2974                        *next_stmt_id += 1;
2975
2976                        ident_stack.push(sort_ident);
2977                    }
2978
2979                    HydroNode::DeferTick { .. } => {
2980                        let input_ident = ident_stack.pop().unwrap();
2981
2982                        let defer_tick_ident =
2983                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2984
2985                        match builders_or_callback {
2986                            BuildersOrCallback::Builders(graph_builders) => {
2987                                let builder = graph_builders.get_dfir_mut(&out_location);
2988                                builder.add_dfir(
2989                                    parse_quote! {
2990                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
2991                                    },
2992                                    None,
2993                                    Some(&next_stmt_id.to_string()),
2994                                );
2995                            }
2996                            BuildersOrCallback::Callback(_, node_callback) => {
2997                                node_callback(node, next_stmt_id);
2998                            }
2999                        }
3000
3001                        *next_stmt_id += 1;
3002
3003                        ident_stack.push(defer_tick_ident);
3004                    }
3005
3006                    HydroNode::Enumerate { input, .. } => {
3007                        let input_ident = ident_stack.pop().unwrap();
3008
3009                        let enumerate_ident =
3010                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3011
3012                        match builders_or_callback {
3013                            BuildersOrCallback::Builders(graph_builders) => {
3014                                let builder = graph_builders.get_dfir_mut(&out_location);
3015                                let lifetime = if input.metadata().location_id.is_top_level() {
3016                                    quote!('static)
3017                                } else {
3018                                    quote!('tick)
3019                                };
3020                                builder.add_dfir(
3021                                    parse_quote! {
3022                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3023                                    },
3024                                    None,
3025                                    Some(&next_stmt_id.to_string()),
3026                                );
3027                            }
3028                            BuildersOrCallback::Callback(_, node_callback) => {
3029                                node_callback(node, next_stmt_id);
3030                            }
3031                        }
3032
3033                        *next_stmt_id += 1;
3034
3035                        ident_stack.push(enumerate_ident);
3036                    }
3037
3038                    HydroNode::Inspect { f, .. } => {
3039                        let input_ident = ident_stack.pop().unwrap();
3040
3041                        let inspect_ident =
3042                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3043
3044                        match builders_or_callback {
3045                            BuildersOrCallback::Builders(graph_builders) => {
3046                                let builder = graph_builders.get_dfir_mut(&out_location);
3047                                builder.add_dfir(
3048                                    parse_quote! {
3049                                        #inspect_ident = #input_ident -> inspect(#f);
3050                                    },
3051                                    None,
3052                                    Some(&next_stmt_id.to_string()),
3053                                );
3054                            }
3055                            BuildersOrCallback::Callback(_, node_callback) => {
3056                                node_callback(node, next_stmt_id);
3057                            }
3058                        }
3059
3060                        *next_stmt_id += 1;
3061
3062                        ident_stack.push(inspect_ident);
3063                    }
3064
3065                    HydroNode::Unique { input, .. } => {
3066                        let input_ident = ident_stack.pop().unwrap();
3067
3068                        let unique_ident =
3069                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3070
3071                        match builders_or_callback {
3072                            BuildersOrCallback::Builders(graph_builders) => {
3073                                let builder = graph_builders.get_dfir_mut(&out_location);
3074                                let lifetime = if input.metadata().location_id.is_top_level() {
3075                                    quote!('static)
3076                                } else {
3077                                    quote!('tick)
3078                                };
3079
3080                                builder.add_dfir(
3081                                    parse_quote! {
3082                                        #unique_ident = #input_ident -> unique::<#lifetime>();
3083                                    },
3084                                    None,
3085                                    Some(&next_stmt_id.to_string()),
3086                                );
3087                            }
3088                            BuildersOrCallback::Callback(_, node_callback) => {
3089                                node_callback(node, next_stmt_id);
3090                            }
3091                        }
3092
3093                        *next_stmt_id += 1;
3094
3095                        ident_stack.push(unique_ident);
3096                    }
3097
3098                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3099                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3100                            if input.metadata().location_id.is_top_level()
3101                                && input.metadata().collection_kind.is_bounded()
3102                            {
3103                                parse_quote!(fold_no_replay)
3104                            } else {
3105                                parse_quote!(fold)
3106                            }
3107                        } else if matches!(node, HydroNode::Scan { .. }) {
3108                            parse_quote!(scan)
3109                        } else if let HydroNode::FoldKeyed { input, .. } = node {
3110                            if input.metadata().location_id.is_top_level()
3111                                && input.metadata().collection_kind.is_bounded()
3112                            {
3113                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
3114                            } else {
3115                                parse_quote!(fold_keyed)
3116                            }
3117                        } else {
3118                            unreachable!()
3119                        };
3120
3121                        let (HydroNode::Fold { input, .. }
3122                        | HydroNode::FoldKeyed { input, .. }
3123                        | HydroNode::Scan { input, .. }) = node
3124                        else {
3125                            unreachable!()
3126                        };
3127
3128                        let lifetime = if input.metadata().location_id.is_top_level() {
3129                            quote!('static)
3130                        } else {
3131                            quote!('tick)
3132                        };
3133
3134                        let input_ident = ident_stack.pop().unwrap();
3135
3136                        let (HydroNode::Fold { init, acc, .. }
3137                        | HydroNode::FoldKeyed { init, acc, .. }
3138                        | HydroNode::Scan { init, acc, .. }) = &*node
3139                        else {
3140                            unreachable!()
3141                        };
3142
3143                        let fold_ident =
3144                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3145
3146                        match builders_or_callback {
3147                            BuildersOrCallback::Builders(graph_builders) => {
3148                                if matches!(node, HydroNode::Fold { .. })
3149                                    && node.metadata().location_id.is_top_level()
3150                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3151                                    && graph_builders.singleton_intermediates()
3152                                    && !node.metadata().collection_kind.is_bounded()
3153                                {
3154                                    let builder = graph_builders.get_dfir_mut(&out_location);
3155
3156                                    let acc: syn::Expr = parse_quote!({
3157                                        let mut __inner = #acc;
3158                                        move |__state, __value| {
3159                                            __inner(__state, __value);
3160                                            Some(__state.clone())
3161                                        }
3162                                    });
3163
3164                                    builder.add_dfir(
3165                                        parse_quote! {
3166                                            source_iter([(#init)()]) -> [0]#fold_ident;
3167                                            #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3168                                            #fold_ident = chain();
3169                                        },
3170                                        None,
3171                                        Some(&next_stmt_id.to_string()),
3172                                    );
3173                                } else if matches!(node, HydroNode::FoldKeyed { .. })
3174                                    && node.metadata().location_id.is_top_level()
3175                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3176                                    && graph_builders.singleton_intermediates()
3177                                    && !node.metadata().collection_kind.is_bounded()
3178                                {
3179                                    let builder = graph_builders.get_dfir_mut(&out_location);
3180
3181                                    let acc: syn::Expr = parse_quote!({
3182                                        let mut __init = #init;
3183                                        let mut __inner = #acc;
3184                                        move |__state, __kv: (_, _)| {
3185                                            // TODO(shadaj): we can avoid the clone when the entry exists
3186                                            let __state = __state
3187                                                .entry(::std::clone::Clone::clone(&__kv.0))
3188                                                .or_insert_with(|| (__init)());
3189                                            __inner(__state, __kv.1);
3190                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3191                                        }
3192                                    });
3193
3194                                    builder.add_dfir(
3195                                        parse_quote! {
3196                                            #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3197                                        },
3198                                        None,
3199                                        Some(&next_stmt_id.to_string()),
3200                                    );
3201                                } else {
3202                                    let builder = graph_builders.get_dfir_mut(&out_location);
3203                                    builder.add_dfir(
3204                                        parse_quote! {
3205                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3206                                        },
3207                                        None,
3208                                        Some(&next_stmt_id.to_string()),
3209                                    );
3210                                }
3211                            }
3212                            BuildersOrCallback::Callback(_, node_callback) => {
3213                                node_callback(node, next_stmt_id);
3214                            }
3215                        }
3216
3217                        *next_stmt_id += 1;
3218
3219                        ident_stack.push(fold_ident);
3220                    }
3221
3222                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3223                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3224                            if input.metadata().location_id.is_top_level()
3225                                && input.metadata().collection_kind.is_bounded()
3226                            {
3227                                parse_quote!(reduce_no_replay)
3228                            } else {
3229                                parse_quote!(reduce)
3230                            }
3231                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
3232                            if input.metadata().location_id.is_top_level()
3233                                && input.metadata().collection_kind.is_bounded()
3234                            {
3235                                todo!(
3236                                    "Calling keyed reduce on a top-level bounded collection is not supported"
3237                                )
3238                            } else {
3239                                parse_quote!(reduce_keyed)
3240                            }
3241                        } else {
3242                            unreachable!()
3243                        };
3244
3245                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3246                        else {
3247                            unreachable!()
3248                        };
3249
3250                        let lifetime = if input.metadata().location_id.is_top_level() {
3251                            quote!('static)
3252                        } else {
3253                            quote!('tick)
3254                        };
3255
3256                        let input_ident = ident_stack.pop().unwrap();
3257
3258                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3259                        else {
3260                            unreachable!()
3261                        };
3262
3263                        let reduce_ident =
3264                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3265
3266                        match builders_or_callback {
3267                            BuildersOrCallback::Builders(graph_builders) => {
3268                                if matches!(node, HydroNode::Reduce { .. })
3269                                    && node.metadata().location_id.is_top_level()
3270                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3271                                    && graph_builders.singleton_intermediates()
3272                                    && !node.metadata().collection_kind.is_bounded()
3273                                {
3274                                    todo!(
3275                                        "Reduce with optional intermediates is not yet supported in simulator"
3276                                    );
3277                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
3278                                    && node.metadata().location_id.is_top_level()
3279                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3280                                    && graph_builders.singleton_intermediates()
3281                                    && !node.metadata().collection_kind.is_bounded()
3282                                {
3283                                    todo!(
3284                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
3285                                    );
3286                                } else {
3287                                    let builder = graph_builders.get_dfir_mut(&out_location);
3288                                    builder.add_dfir(
3289                                        parse_quote! {
3290                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3291                                        },
3292                                        None,
3293                                        Some(&next_stmt_id.to_string()),
3294                                    );
3295                                }
3296                            }
3297                            BuildersOrCallback::Callback(_, node_callback) => {
3298                                node_callback(node, next_stmt_id);
3299                            }
3300                        }
3301
3302                        *next_stmt_id += 1;
3303
3304                        ident_stack.push(reduce_ident);
3305                    }
3306
3307                    HydroNode::ReduceKeyedWatermark {
3308                        f,
3309                        input,
3310                        metadata,
3311                        ..
3312                    } => {
3313                        let lifetime = if input.metadata().location_id.is_top_level() {
3314                            quote!('static)
3315                        } else {
3316                            quote!('tick)
3317                        };
3318
3319                        // watermark is processed second, so it's on top
3320                        let watermark_ident = ident_stack.pop().unwrap();
3321                        let input_ident = ident_stack.pop().unwrap();
3322
3323                        let chain_ident = syn::Ident::new(
3324                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3325                            Span::call_site(),
3326                        );
3327
3328                        let fold_ident =
3329                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3330
3331                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3332                            && input.metadata().collection_kind.is_bounded()
3333                        {
3334                            parse_quote!(fold_no_replay)
3335                        } else {
3336                            parse_quote!(fold)
3337                        };
3338
3339                        match builders_or_callback {
3340                            BuildersOrCallback::Builders(graph_builders) => {
3341                                if metadata.location_id.is_top_level()
3342                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3343                                    && graph_builders.singleton_intermediates()
3344                                    && !metadata.collection_kind.is_bounded()
3345                                {
3346                                    todo!(
3347                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3348                                    )
3349                                } else {
3350                                    let builder = graph_builders.get_dfir_mut(&out_location);
3351                                    builder.add_dfir(
3352                                        parse_quote! {
3353                                            #chain_ident = chain();
3354                                            #input_ident
3355                                                -> map(|x| (Some(x), None))
3356                                                -> [0]#chain_ident;
3357                                            #watermark_ident
3358                                                -> map(|watermark| (None, Some(watermark)))
3359                                                -> [1]#chain_ident;
3360
3361                                            #fold_ident = #chain_ident
3362                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3363                                                    let __reduce_keyed_fn = #f;
3364                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3365                                                        if let Some((k, v)) = opt_payload {
3366                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3367                                                                if k <= curr_watermark {
3368                                                                    return;
3369                                                                }
3370                                                            }
3371                                                            match map.entry(k) {
3372                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
3373                                                                    e.insert(v);
3374                                                                }
3375                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
3376                                                                    __reduce_keyed_fn(e.get_mut(), v);
3377                                                                }
3378                                                            }
3379                                                        } else {
3380                                                            let watermark = opt_watermark.unwrap();
3381                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3382                                                                if watermark <= curr_watermark {
3383                                                                    return;
3384                                                                }
3385                                                            }
3386                                                            *opt_curr_watermark = opt_watermark;
3387                                                            map.retain(|k, _| *k > watermark);
3388                                                        }
3389                                                    }
3390                                                })
3391                                                -> flat_map(|(map, _curr_watermark)| map);
3392                                        },
3393                                        None,
3394                                        Some(&next_stmt_id.to_string()),
3395                                    );
3396                                }
3397                            }
3398                            BuildersOrCallback::Callback(_, node_callback) => {
3399                                node_callback(node, next_stmt_id);
3400                            }
3401                        }
3402
3403                        *next_stmt_id += 1;
3404
3405                        ident_stack.push(fold_ident);
3406                    }
3407
3408                    HydroNode::Network {
3409                        serialize_fn: serialize_pipeline,
3410                        instantiate_fn,
3411                        deserialize_fn: deserialize_pipeline,
3412                        input,
3413                        ..
3414                    } => {
3415                        let input_ident = ident_stack.pop().unwrap();
3416
3417                        let receiver_stream_ident =
3418                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3419
3420                        match builders_or_callback {
3421                            BuildersOrCallback::Builders(graph_builders) => {
3422                                let (sink_expr, source_expr) = match instantiate_fn {
3423                                    DebugInstantiate::Building => (
3424                                        syn::parse_quote!(DUMMY_SINK),
3425                                        syn::parse_quote!(DUMMY_SOURCE),
3426                                    ),
3427
3428                                    DebugInstantiate::Finalized(finalized) => {
3429                                        (finalized.sink.clone(), finalized.source.clone())
3430                                    }
3431                                };
3432
3433                                graph_builders.create_network(
3434                                    &input.metadata().location_id,
3435                                    &out_location,
3436                                    input_ident,
3437                                    &receiver_stream_ident,
3438                                    serialize_pipeline.as_ref(),
3439                                    sink_expr,
3440                                    source_expr,
3441                                    deserialize_pipeline.as_ref(),
3442                                    *next_stmt_id,
3443                                );
3444                            }
3445                            BuildersOrCallback::Callback(_, node_callback) => {
3446                                node_callback(node, next_stmt_id);
3447                            }
3448                        }
3449
3450                        *next_stmt_id += 1;
3451
3452                        ident_stack.push(receiver_stream_ident);
3453                    }
3454
3455                    HydroNode::ExternalInput {
3456                        instantiate_fn,
3457                        deserialize_fn: deserialize_pipeline,
3458                        ..
3459                    } => {
3460                        let receiver_stream_ident =
3461                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3462
3463                        match builders_or_callback {
3464                            BuildersOrCallback::Builders(graph_builders) => {
3465                                let (_, source_expr) = match instantiate_fn {
3466                                    DebugInstantiate::Building => (
3467                                        syn::parse_quote!(DUMMY_SINK),
3468                                        syn::parse_quote!(DUMMY_SOURCE),
3469                                    ),
3470
3471                                    DebugInstantiate::Finalized(finalized) => {
3472                                        (finalized.sink.clone(), finalized.source.clone())
3473                                    }
3474                                };
3475
3476                                graph_builders.create_external_source(
3477                                    &out_location,
3478                                    source_expr,
3479                                    &receiver_stream_ident,
3480                                    deserialize_pipeline.as_ref(),
3481                                    *next_stmt_id,
3482                                );
3483                            }
3484                            BuildersOrCallback::Callback(_, node_callback) => {
3485                                node_callback(node, next_stmt_id);
3486                            }
3487                        }
3488
3489                        *next_stmt_id += 1;
3490
3491                        ident_stack.push(receiver_stream_ident);
3492                    }
3493
3494                    HydroNode::Counter {
3495                        tag,
3496                        duration,
3497                        prefix,
3498                        ..
3499                    } => {
3500                        let input_ident = ident_stack.pop().unwrap();
3501
3502                        let counter_ident =
3503                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3504
3505                        match builders_or_callback {
3506                            BuildersOrCallback::Builders(graph_builders) => {
3507                                let arg = format!("{}({})", prefix, tag);
3508                                let builder = graph_builders.get_dfir_mut(&out_location);
3509                                builder.add_dfir(
3510                                    parse_quote! {
3511                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
3512                                    },
3513                                    None,
3514                                    Some(&next_stmt_id.to_string()),
3515                                );
3516                            }
3517                            BuildersOrCallback::Callback(_, node_callback) => {
3518                                node_callback(node, next_stmt_id);
3519                            }
3520                        }
3521
3522                        *next_stmt_id += 1;
3523
3524                        ident_stack.push(counter_ident);
3525                    }
3526                }
3527            },
3528            seen_tees,
3529            false,
3530        );
3531
3532        ident_stack
3533            .pop()
3534            .expect("ident_stack should have exactly one element after traversal")
3535    }
3536
3537    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3538        match self {
3539            HydroNode::Placeholder => {
3540                panic!()
3541            }
3542            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3543            HydroNode::Source { source, .. } => match source {
3544                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3545                HydroSource::ExternalNetwork()
3546                | HydroSource::Spin()
3547                | HydroSource::ClusterMembers(_)
3548                | HydroSource::Embedded(_) => {} // TODO: what goes here?
3549            },
3550            HydroNode::SingletonSource { value, .. } => {
3551                transform(value);
3552            }
3553            HydroNode::CycleSource { .. }
3554            | HydroNode::Tee { .. }
3555            | HydroNode::YieldConcat { .. }
3556            | HydroNode::BeginAtomic { .. }
3557            | HydroNode::EndAtomic { .. }
3558            | HydroNode::Batch { .. }
3559            | HydroNode::Chain { .. }
3560            | HydroNode::ChainFirst { .. }
3561            | HydroNode::CrossProduct { .. }
3562            | HydroNode::CrossSingleton { .. }
3563            | HydroNode::ResolveFutures { .. }
3564            | HydroNode::ResolveFuturesOrdered { .. }
3565            | HydroNode::Join { .. }
3566            | HydroNode::Difference { .. }
3567            | HydroNode::AntiJoin { .. }
3568            | HydroNode::DeferTick { .. }
3569            | HydroNode::Enumerate { .. }
3570            | HydroNode::Unique { .. }
3571            | HydroNode::Sort { .. } => {}
3572            HydroNode::Map { f, .. }
3573            | HydroNode::FlatMap { f, .. }
3574            | HydroNode::Filter { f, .. }
3575            | HydroNode::FilterMap { f, .. }
3576            | HydroNode::Inspect { f, .. }
3577            | HydroNode::Reduce { f, .. }
3578            | HydroNode::ReduceKeyed { f, .. }
3579            | HydroNode::ReduceKeyedWatermark { f, .. } => {
3580                transform(f);
3581            }
3582            HydroNode::Fold { init, acc, .. }
3583            | HydroNode::Scan { init, acc, .. }
3584            | HydroNode::FoldKeyed { init, acc, .. } => {
3585                transform(init);
3586                transform(acc);
3587            }
3588            HydroNode::Network {
3589                serialize_fn,
3590                deserialize_fn,
3591                ..
3592            } => {
3593                if let Some(serialize_fn) = serialize_fn {
3594                    transform(serialize_fn);
3595                }
3596                if let Some(deserialize_fn) = deserialize_fn {
3597                    transform(deserialize_fn);
3598                }
3599            }
3600            HydroNode::ExternalInput { deserialize_fn, .. } => {
3601                if let Some(deserialize_fn) = deserialize_fn {
3602                    transform(deserialize_fn);
3603                }
3604            }
3605            HydroNode::Counter { duration, .. } => {
3606                transform(duration);
3607            }
3608        }
3609    }
3610
3611    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3612        &self.metadata().op
3613    }
3614
3615    pub fn metadata(&self) -> &HydroIrMetadata {
3616        match self {
3617            HydroNode::Placeholder => {
3618                panic!()
3619            }
3620            HydroNode::Cast { metadata, .. } => metadata,
3621            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3622            HydroNode::Source { metadata, .. } => metadata,
3623            HydroNode::SingletonSource { metadata, .. } => metadata,
3624            HydroNode::CycleSource { metadata, .. } => metadata,
3625            HydroNode::Tee { metadata, .. } => metadata,
3626            HydroNode::YieldConcat { metadata, .. } => metadata,
3627            HydroNode::BeginAtomic { metadata, .. } => metadata,
3628            HydroNode::EndAtomic { metadata, .. } => metadata,
3629            HydroNode::Batch { metadata, .. } => metadata,
3630            HydroNode::Chain { metadata, .. } => metadata,
3631            HydroNode::ChainFirst { metadata, .. } => metadata,
3632            HydroNode::CrossProduct { metadata, .. } => metadata,
3633            HydroNode::CrossSingleton { metadata, .. } => metadata,
3634            HydroNode::Join { metadata, .. } => metadata,
3635            HydroNode::Difference { metadata, .. } => metadata,
3636            HydroNode::AntiJoin { metadata, .. } => metadata,
3637            HydroNode::ResolveFutures { metadata, .. } => metadata,
3638            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3639            HydroNode::Map { metadata, .. } => metadata,
3640            HydroNode::FlatMap { metadata, .. } => metadata,
3641            HydroNode::Filter { metadata, .. } => metadata,
3642            HydroNode::FilterMap { metadata, .. } => metadata,
3643            HydroNode::DeferTick { metadata, .. } => metadata,
3644            HydroNode::Enumerate { metadata, .. } => metadata,
3645            HydroNode::Inspect { metadata, .. } => metadata,
3646            HydroNode::Unique { metadata, .. } => metadata,
3647            HydroNode::Sort { metadata, .. } => metadata,
3648            HydroNode::Scan { metadata, .. } => metadata,
3649            HydroNode::Fold { metadata, .. } => metadata,
3650            HydroNode::FoldKeyed { metadata, .. } => metadata,
3651            HydroNode::Reduce { metadata, .. } => metadata,
3652            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3653            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3654            HydroNode::ExternalInput { metadata, .. } => metadata,
3655            HydroNode::Network { metadata, .. } => metadata,
3656            HydroNode::Counter { metadata, .. } => metadata,
3657        }
3658    }
3659
3660    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3661        &mut self.metadata_mut().op
3662    }
3663
3664    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3665        match self {
3666            HydroNode::Placeholder => {
3667                panic!()
3668            }
3669            HydroNode::Cast { metadata, .. } => metadata,
3670            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3671            HydroNode::Source { metadata, .. } => metadata,
3672            HydroNode::SingletonSource { metadata, .. } => metadata,
3673            HydroNode::CycleSource { metadata, .. } => metadata,
3674            HydroNode::Tee { metadata, .. } => metadata,
3675            HydroNode::YieldConcat { metadata, .. } => metadata,
3676            HydroNode::BeginAtomic { metadata, .. } => metadata,
3677            HydroNode::EndAtomic { metadata, .. } => metadata,
3678            HydroNode::Batch { metadata, .. } => metadata,
3679            HydroNode::Chain { metadata, .. } => metadata,
3680            HydroNode::ChainFirst { metadata, .. } => metadata,
3681            HydroNode::CrossProduct { metadata, .. } => metadata,
3682            HydroNode::CrossSingleton { metadata, .. } => metadata,
3683            HydroNode::Join { metadata, .. } => metadata,
3684            HydroNode::Difference { metadata, .. } => metadata,
3685            HydroNode::AntiJoin { metadata, .. } => metadata,
3686            HydroNode::ResolveFutures { metadata, .. } => metadata,
3687            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3688            HydroNode::Map { metadata, .. } => metadata,
3689            HydroNode::FlatMap { metadata, .. } => metadata,
3690            HydroNode::Filter { metadata, .. } => metadata,
3691            HydroNode::FilterMap { metadata, .. } => metadata,
3692            HydroNode::DeferTick { metadata, .. } => metadata,
3693            HydroNode::Enumerate { metadata, .. } => metadata,
3694            HydroNode::Inspect { metadata, .. } => metadata,
3695            HydroNode::Unique { metadata, .. } => metadata,
3696            HydroNode::Sort { metadata, .. } => metadata,
3697            HydroNode::Scan { metadata, .. } => metadata,
3698            HydroNode::Fold { metadata, .. } => metadata,
3699            HydroNode::FoldKeyed { metadata, .. } => metadata,
3700            HydroNode::Reduce { metadata, .. } => metadata,
3701            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3702            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3703            HydroNode::ExternalInput { metadata, .. } => metadata,
3704            HydroNode::Network { metadata, .. } => metadata,
3705            HydroNode::Counter { metadata, .. } => metadata,
3706        }
3707    }
3708
3709    pub fn input(&self) -> Vec<&HydroNode> {
3710        match self {
3711            HydroNode::Placeholder => {
3712                panic!()
3713            }
3714            HydroNode::Source { .. }
3715            | HydroNode::SingletonSource { .. }
3716            | HydroNode::ExternalInput { .. }
3717            | HydroNode::CycleSource { .. }
3718            | HydroNode::Tee { .. } => {
3719                // Tee should find its input in separate special ways
3720                vec![]
3721            }
3722            HydroNode::Cast { inner, .. }
3723            | HydroNode::ObserveNonDet { inner, .. }
3724            | HydroNode::YieldConcat { inner, .. }
3725            | HydroNode::BeginAtomic { inner, .. }
3726            | HydroNode::EndAtomic { inner, .. }
3727            | HydroNode::Batch { inner, .. } => {
3728                vec![inner]
3729            }
3730            HydroNode::Chain { first, second, .. } => {
3731                vec![first, second]
3732            }
3733            HydroNode::ChainFirst { first, second, .. } => {
3734                vec![first, second]
3735            }
3736            HydroNode::CrossProduct { left, right, .. }
3737            | HydroNode::CrossSingleton { left, right, .. }
3738            | HydroNode::Join { left, right, .. } => {
3739                vec![left, right]
3740            }
3741            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3742                vec![pos, neg]
3743            }
3744            HydroNode::Map { input, .. }
3745            | HydroNode::FlatMap { input, .. }
3746            | HydroNode::Filter { input, .. }
3747            | HydroNode::FilterMap { input, .. }
3748            | HydroNode::Sort { input, .. }
3749            | HydroNode::DeferTick { input, .. }
3750            | HydroNode::Enumerate { input, .. }
3751            | HydroNode::Inspect { input, .. }
3752            | HydroNode::Unique { input, .. }
3753            | HydroNode::Network { input, .. }
3754            | HydroNode::Counter { input, .. }
3755            | HydroNode::ResolveFutures { input, .. }
3756            | HydroNode::ResolveFuturesOrdered { input, .. }
3757            | HydroNode::Fold { input, .. }
3758            | HydroNode::FoldKeyed { input, .. }
3759            | HydroNode::Reduce { input, .. }
3760            | HydroNode::ReduceKeyed { input, .. }
3761            | HydroNode::Scan { input, .. } => {
3762                vec![input]
3763            }
3764            HydroNode::ReduceKeyedWatermark {
3765                input, watermark, ..
3766            } => {
3767                vec![input, watermark]
3768            }
3769        }
3770    }
3771
3772    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3773        self.input()
3774            .iter()
3775            .map(|input_node| input_node.metadata())
3776            .collect()
3777    }
3778
3779    pub fn print_root(&self) -> String {
3780        match self {
3781            HydroNode::Placeholder => {
3782                panic!()
3783            }
3784            HydroNode::Cast { .. } => "Cast()".to_owned(),
3785            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3786            HydroNode::Source { source, .. } => format!("Source({:?})", source),
3787            HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3788            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
3789            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3790            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3791            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3792            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3793            HydroNode::Batch { .. } => "Batch()".to_owned(),
3794            HydroNode::Chain { first, second, .. } => {
3795                format!("Chain({}, {})", first.print_root(), second.print_root())
3796            }
3797            HydroNode::ChainFirst { first, second, .. } => {
3798                format!(
3799                    "ChainFirst({}, {})",
3800                    first.print_root(),
3801                    second.print_root()
3802                )
3803            }
3804            HydroNode::CrossProduct { left, right, .. } => {
3805                format!(
3806                    "CrossProduct({}, {})",
3807                    left.print_root(),
3808                    right.print_root()
3809                )
3810            }
3811            HydroNode::CrossSingleton { left, right, .. } => {
3812                format!(
3813                    "CrossSingleton({}, {})",
3814                    left.print_root(),
3815                    right.print_root()
3816                )
3817            }
3818            HydroNode::Join { left, right, .. } => {
3819                format!("Join({}, {})", left.print_root(), right.print_root())
3820            }
3821            HydroNode::Difference { pos, neg, .. } => {
3822                format!("Difference({}, {})", pos.print_root(), neg.print_root())
3823            }
3824            HydroNode::AntiJoin { pos, neg, .. } => {
3825                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3826            }
3827            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
3828            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
3829            HydroNode::Map { f, .. } => format!("Map({:?})", f),
3830            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3831            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3832            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3833            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
3834            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
3835            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3836            HydroNode::Unique { .. } => "Unique()".to_owned(),
3837            HydroNode::Sort { .. } => "Sort()".to_owned(),
3838            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3839            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3840            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3841            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3842            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3843            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3844            HydroNode::Network { .. } => "Network()".to_owned(),
3845            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
3846            HydroNode::Counter { tag, duration, .. } => {
3847                format!("Counter({:?}, {:?})", tag, duration)
3848            }
3849        }
3850    }
3851}
3852
3853#[cfg(feature = "build")]
3854fn instantiate_network<'a, D>(
3855    env: &mut D::InstantiateEnv,
3856    from_location: &LocationId,
3857    to_location: &LocationId,
3858    processes: &SparseSecondaryMap<LocationKey, D::Process>,
3859    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
3860    name: Option<&str>,
3861) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3862where
3863    D: Deploy<'a>,
3864{
3865    let ((sink, source), connect_fn) = match (from_location, to_location) {
3866        (&LocationId::Process(from), &LocationId::Process(to)) => {
3867            let from_node = processes
3868                .get(from)
3869                .unwrap_or_else(|| {
3870                    panic!("A process used in the graph was not instantiated: {}", from)
3871                })
3872                .clone();
3873            let to_node = processes
3874                .get(to)
3875                .unwrap_or_else(|| {
3876                    panic!("A process used in the graph was not instantiated: {}", to)
3877                })
3878                .clone();
3879
3880            let sink_port = from_node.next_port();
3881            let source_port = to_node.next_port();
3882
3883            (
3884                D::o2o_sink_source(env, &from_node, &sink_port, &to_node, &source_port, name),
3885                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3886            )
3887        }
3888        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
3889            let from_node = processes
3890                .get(from)
3891                .unwrap_or_else(|| {
3892                    panic!("A process used in the graph was not instantiated: {}", from)
3893                })
3894                .clone();
3895            let to_node = clusters
3896                .get(to)
3897                .unwrap_or_else(|| {
3898                    panic!("A cluster used in the graph was not instantiated: {}", to)
3899                })
3900                .clone();
3901
3902            let sink_port = from_node.next_port();
3903            let source_port = to_node.next_port();
3904
3905            (
3906                D::o2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3907                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3908            )
3909        }
3910        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
3911            let from_node = clusters
3912                .get(from)
3913                .unwrap_or_else(|| {
3914                    panic!("A cluster used in the graph was not instantiated: {}", from)
3915                })
3916                .clone();
3917            let to_node = processes
3918                .get(to)
3919                .unwrap_or_else(|| {
3920                    panic!("A process used in the graph was not instantiated: {}", to)
3921                })
3922                .clone();
3923
3924            let sink_port = from_node.next_port();
3925            let source_port = to_node.next_port();
3926
3927            (
3928                D::m2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3929                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3930            )
3931        }
3932        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
3933            let from_node = clusters
3934                .get(from)
3935                .unwrap_or_else(|| {
3936                    panic!("A cluster used in the graph was not instantiated: {}", from)
3937                })
3938                .clone();
3939            let to_node = clusters
3940                .get(to)
3941                .unwrap_or_else(|| {
3942                    panic!("A cluster used in the graph was not instantiated: {}", to)
3943                })
3944                .clone();
3945
3946            let sink_port = from_node.next_port();
3947            let source_port = to_node.next_port();
3948
3949            (
3950                D::m2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3951                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3952            )
3953        }
3954        (LocationId::Tick(_, _), _) => panic!(),
3955        (_, LocationId::Tick(_, _)) => panic!(),
3956        (LocationId::Atomic(_), _) => panic!(),
3957        (_, LocationId::Atomic(_)) => panic!(),
3958    };
3959    (sink, source, connect_fn)
3960}
3961
3962#[cfg(test)]
3963mod test {
3964    use std::mem::size_of;
3965
3966    use stageleft::{QuotedWithContext, q};
3967
3968    use super::*;
3969
3970    #[test]
3971    #[cfg_attr(
3972        not(feature = "build"),
3973        ignore = "expects inclusion of feature-gated fields"
3974    )]
3975    fn hydro_node_size() {
3976        assert_eq!(size_of::<HydroNode>(), 240);
3977    }
3978
3979    #[test]
3980    #[cfg_attr(
3981        not(feature = "build"),
3982        ignore = "expects inclusion of feature-gated fields"
3983    )]
3984    fn hydro_root_size() {
3985        assert_eq!(size_of::<HydroRoot>(), 136);
3986    }
3987
3988    #[test]
3989    fn test_simplify_q_macro_basic() {
3990        // Test basic non-q! expression
3991        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3992        let result = simplify_q_macro(simple_expr.clone());
3993        assert_eq!(result, simple_expr);
3994    }
3995
3996    #[test]
3997    fn test_simplify_q_macro_actual_stageleft_call() {
3998        // Test a simplified version of what a real stageleft call might look like
3999        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4000        let result = simplify_q_macro(stageleft_call);
4001        // This should be processed by our visitor and simplified to q!(...)
4002        // since we detect the stageleft::runtime_support::fn_* pattern
4003        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4004    }
4005
4006    #[test]
4007    fn test_closure_no_pipe_at_start() {
4008        // Test a closure that does not start with a pipe
4009        let stageleft_call = q!({
4010            let foo = 123;
4011            move |b: usize| b + foo
4012        })
4013        .splice_fn1_ctx(&());
4014        let result = simplify_q_macro(stageleft_call);
4015        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4016    }
4017}