Skip to main content

hydro_lang/deploy/
deploy_graph.rs

1//! Deployment backend for Hydro that uses [`hydro_deploy`] to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use slotmap::SparseSecondaryMap;
25use stageleft::{QuotedWithContext, RuntimeData};
26use syn::parse_quote;
27
28use super::deploy_runtime::*;
29use crate::compile::builder::ExternalPortId;
30use crate::compile::deploy_provider::{
31    ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
32};
33use crate::compile::trybuild::generate::{
34    HYDRO_RUNTIME_FEATURES, LinkingMode, create_graph_trybuild,
35};
36use crate::location::dynamic::LocationId;
37use crate::location::member_id::TaglessMemberId;
38use crate::location::{LocationKey, MembershipEvent, NetworkHint};
39use crate::staging_util::get_this_crate;
40
41/// Deployment backend that uses [`hydro_deploy`] for provisioning and launching.
42///
43/// Automatically used when you call [`crate::compile::builder::FlowBuilder::deploy`] and pass in
44/// an `&mut` reference to [`hydro_deploy::Deployment`] as the deployment context.
45pub enum HydroDeploy {}
46
47impl<'a> Deploy<'a> for HydroDeploy {
48    /// Map from Cluster location ID to member IDs.
49    type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
50    type InstantiateEnv = Deployment;
51
52    type Process = DeployNode;
53    type Cluster = DeployCluster;
54    type External = DeployExternal;
55
56    fn o2o_sink_source(
57        _env: &mut Self::InstantiateEnv,
58        _p1: &Self::Process,
59        p1_port: &<Self::Process as Node>::Port,
60        _p2: &Self::Process,
61        p2_port: &<Self::Process as Node>::Port,
62        _name: Option<&str>,
63    ) -> (syn::Expr, syn::Expr) {
64        let p1_port = p1_port.as_str();
65        let p2_port = p2_port.as_str();
66        deploy_o2o(
67            RuntimeData::new("__hydro_lang_trybuild_cli"),
68            p1_port,
69            p2_port,
70        )
71    }
72
73    fn o2o_connect(
74        p1: &Self::Process,
75        p1_port: &<Self::Process as Node>::Port,
76        p2: &Self::Process,
77        p2_port: &<Self::Process as Node>::Port,
78    ) -> Box<dyn FnOnce()> {
79        let p1 = p1.clone();
80        let p1_port = p1_port.clone();
81        let p2 = p2.clone();
82        let p2_port = p2_port.clone();
83
84        Box::new(move || {
85            let self_underlying_borrow = p1.underlying.borrow();
86            let self_underlying = self_underlying_borrow.as_ref().unwrap();
87            let source_port = self_underlying.get_port(p1_port.clone());
88
89            let other_underlying_borrow = p2.underlying.borrow();
90            let other_underlying = other_underlying_borrow.as_ref().unwrap();
91            let recipient_port = other_underlying.get_port(p2_port.clone());
92
93            source_port.send_to(&recipient_port)
94        })
95    }
96
97    fn o2m_sink_source(
98        _p1: &Self::Process,
99        p1_port: &<Self::Process as Node>::Port,
100        _c2: &Self::Cluster,
101        c2_port: &<Self::Cluster as Node>::Port,
102    ) -> (syn::Expr, syn::Expr) {
103        let p1_port = p1_port.as_str();
104        let c2_port = c2_port.as_str();
105        deploy_o2m(
106            RuntimeData::new("__hydro_lang_trybuild_cli"),
107            p1_port,
108            c2_port,
109        )
110    }
111
112    fn o2m_connect(
113        p1: &Self::Process,
114        p1_port: &<Self::Process as Node>::Port,
115        c2: &Self::Cluster,
116        c2_port: &<Self::Cluster as Node>::Port,
117    ) -> Box<dyn FnOnce()> {
118        let p1 = p1.clone();
119        let p1_port = p1_port.clone();
120        let c2 = c2.clone();
121        let c2_port = c2_port.clone();
122
123        Box::new(move || {
124            let self_underlying_borrow = p1.underlying.borrow();
125            let self_underlying = self_underlying_borrow.as_ref().unwrap();
126            let source_port = self_underlying.get_port(p1_port.clone());
127
128            let recipient_port = DemuxSink {
129                demux: c2
130                    .members
131                    .borrow()
132                    .iter()
133                    .enumerate()
134                    .map(|(id, c)| {
135                        (
136                            id as u32,
137                            Arc::new(c.underlying.get_port(c2_port.clone()))
138                                as Arc<dyn RustCrateSink + 'static>,
139                        )
140                    })
141                    .collect(),
142            };
143
144            source_port.send_to(&recipient_port)
145        })
146    }
147
148    fn m2o_sink_source(
149        _c1: &Self::Cluster,
150        c1_port: &<Self::Cluster as Node>::Port,
151        _p2: &Self::Process,
152        p2_port: &<Self::Process as Node>::Port,
153    ) -> (syn::Expr, syn::Expr) {
154        let c1_port = c1_port.as_str();
155        let p2_port = p2_port.as_str();
156        deploy_m2o(
157            RuntimeData::new("__hydro_lang_trybuild_cli"),
158            c1_port,
159            p2_port,
160        )
161    }
162
163    fn m2o_connect(
164        c1: &Self::Cluster,
165        c1_port: &<Self::Cluster as Node>::Port,
166        p2: &Self::Process,
167        p2_port: &<Self::Process as Node>::Port,
168    ) -> Box<dyn FnOnce()> {
169        let c1 = c1.clone();
170        let c1_port = c1_port.clone();
171        let p2 = p2.clone();
172        let p2_port = p2_port.clone();
173
174        Box::new(move || {
175            let other_underlying_borrow = p2.underlying.borrow();
176            let other_underlying = other_underlying_borrow.as_ref().unwrap();
177            let recipient_port = other_underlying.get_port(p2_port.clone()).merge();
178
179            for (i, node) in c1.members.borrow().iter().enumerate() {
180                let source_port = node.underlying.get_port(c1_port.clone());
181
182                TaggedSource {
183                    source: Arc::new(source_port),
184                    tag: i as u32,
185                }
186                .send_to(&recipient_port);
187            }
188        })
189    }
190
191    fn m2m_sink_source(
192        _c1: &Self::Cluster,
193        c1_port: &<Self::Cluster as Node>::Port,
194        _c2: &Self::Cluster,
195        c2_port: &<Self::Cluster as Node>::Port,
196    ) -> (syn::Expr, syn::Expr) {
197        let c1_port = c1_port.as_str();
198        let c2_port = c2_port.as_str();
199        deploy_m2m(
200            RuntimeData::new("__hydro_lang_trybuild_cli"),
201            c1_port,
202            c2_port,
203        )
204    }
205
206    fn m2m_connect(
207        c1: &Self::Cluster,
208        c1_port: &<Self::Cluster as Node>::Port,
209        c2: &Self::Cluster,
210        c2_port: &<Self::Cluster as Node>::Port,
211    ) -> Box<dyn FnOnce()> {
212        let c1 = c1.clone();
213        let c1_port = c1_port.clone();
214        let c2 = c2.clone();
215        let c2_port = c2_port.clone();
216
217        Box::new(move || {
218            for (i, sender) in c1.members.borrow().iter().enumerate() {
219                let source_port = sender.underlying.get_port(c1_port.clone());
220
221                let recipient_port = DemuxSink {
222                    demux: c2
223                        .members
224                        .borrow()
225                        .iter()
226                        .enumerate()
227                        .map(|(id, c)| {
228                            (
229                                id as u32,
230                                Arc::new(c.underlying.get_port(c2_port.clone()).merge())
231                                    as Arc<dyn RustCrateSink + 'static>,
232                            )
233                        })
234                        .collect(),
235                };
236
237                TaggedSource {
238                    source: Arc::new(source_port),
239                    tag: i as u32,
240                }
241                .send_to(&recipient_port);
242            }
243        })
244    }
245
246    fn e2o_many_source(
247        extra_stmts: &mut Vec<syn::Stmt>,
248        _p2: &Self::Process,
249        p2_port: &<Self::Process as Node>::Port,
250        codec_type: &syn::Type,
251        shared_handle: String,
252    ) -> syn::Expr {
253        let connect_ident = syn::Ident::new(
254            &format!("__hydro_deploy_many_{}_connect", &shared_handle),
255            Span::call_site(),
256        );
257        let source_ident = syn::Ident::new(
258            &format!("__hydro_deploy_many_{}_source", &shared_handle),
259            Span::call_site(),
260        );
261        let sink_ident = syn::Ident::new(
262            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
263            Span::call_site(),
264        );
265        let membership_ident = syn::Ident::new(
266            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
267            Span::call_site(),
268        );
269
270        let root = get_this_crate();
271
272        extra_stmts.push(syn::parse_quote! {
273            let #connect_ident = __hydro_lang_trybuild_cli
274                .port(#p2_port)
275                .connect::<#root::runtime_support::hydro_deploy_integration::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
276        });
277
278        extra_stmts.push(syn::parse_quote! {
279            let #source_ident = #connect_ident.source;
280        });
281
282        extra_stmts.push(syn::parse_quote! {
283            let #sink_ident = #connect_ident.sink;
284        });
285
286        extra_stmts.push(syn::parse_quote! {
287            let #membership_ident = #connect_ident.membership;
288        });
289
290        parse_quote!(#source_ident)
291    }
292
293    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
294        let sink_ident = syn::Ident::new(
295            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
296            Span::call_site(),
297        );
298        parse_quote!(#sink_ident)
299    }
300
301    fn e2o_source(
302        extra_stmts: &mut Vec<syn::Stmt>,
303        _p1: &Self::External,
304        _p1_port: &<Self::External as Node>::Port,
305        _p2: &Self::Process,
306        p2_port: &<Self::Process as Node>::Port,
307        codec_type: &syn::Type,
308        shared_handle: String,
309    ) -> syn::Expr {
310        let connect_ident = syn::Ident::new(
311            &format!("__hydro_deploy_{}_connect", &shared_handle),
312            Span::call_site(),
313        );
314        let source_ident = syn::Ident::new(
315            &format!("__hydro_deploy_{}_source", &shared_handle),
316            Span::call_site(),
317        );
318        let sink_ident = syn::Ident::new(
319            &format!("__hydro_deploy_{}_sink", &shared_handle),
320            Span::call_site(),
321        );
322
323        let root = get_this_crate();
324
325        extra_stmts.push(syn::parse_quote! {
326            let #connect_ident = __hydro_lang_trybuild_cli
327                .port(#p2_port)
328                .connect::<#root::runtime_support::hydro_deploy_integration::single_connection::ConnectedSingleConnection<_, _, #codec_type>>();
329        });
330
331        extra_stmts.push(syn::parse_quote! {
332            let #source_ident = #connect_ident.source;
333        });
334
335        extra_stmts.push(syn::parse_quote! {
336            let #sink_ident = #connect_ident.sink;
337        });
338
339        parse_quote!(#source_ident)
340    }
341
342    fn e2o_connect(
343        p1: &Self::External,
344        p1_port: &<Self::External as Node>::Port,
345        p2: &Self::Process,
346        p2_port: &<Self::Process as Node>::Port,
347        _many: bool,
348        server_hint: NetworkHint,
349    ) -> Box<dyn FnOnce()> {
350        let p1 = p1.clone();
351        let p1_port = p1_port.clone();
352        let p2 = p2.clone();
353        let p2_port = p2_port.clone();
354
355        Box::new(move || {
356            let self_underlying_borrow = p1.underlying.borrow();
357            let self_underlying = self_underlying_borrow.as_ref().unwrap();
358            let source_port = self_underlying.declare_many_client();
359
360            let other_underlying_borrow = p2.underlying.borrow();
361            let other_underlying = other_underlying_borrow.as_ref().unwrap();
362            let recipient_port = other_underlying.get_port_with_hint(
363                p2_port.clone(),
364                match server_hint {
365                    NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
366                    NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
367                },
368            );
369
370            source_port.send_to(&recipient_port);
371
372            p1.client_ports
373                .borrow_mut()
374                .insert(p1_port.clone(), source_port);
375        })
376    }
377
378    fn o2e_sink(
379        _p1: &Self::Process,
380        _p1_port: &<Self::Process as Node>::Port,
381        _p2: &Self::External,
382        _p2_port: &<Self::External as Node>::Port,
383        shared_handle: String,
384    ) -> syn::Expr {
385        let sink_ident = syn::Ident::new(
386            &format!("__hydro_deploy_{}_sink", &shared_handle),
387            Span::call_site(),
388        );
389        parse_quote!(#sink_ident)
390    }
391
392    fn cluster_ids(
393        of_cluster: LocationKey,
394    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
395        cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
396    }
397
398    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
399        cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
400    }
401
402    fn cluster_membership_stream(
403        location_id: &LocationId,
404    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
405    {
406        cluster_membership_stream(location_id)
407    }
408}
409
410#[expect(missing_docs, reason = "TODO")]
411pub trait DeployCrateWrapper {
412    fn underlying(&self) -> Arc<RustCrateService>;
413
414    fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
415        self.underlying().stdout()
416    }
417
418    fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
419        self.underlying().stderr()
420    }
421
422    fn stdout_filter(
423        &self,
424        prefix: impl Into<String>,
425    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
426        self.underlying().stdout_filter(prefix.into())
427    }
428
429    fn stderr_filter(
430        &self,
431        prefix: impl Into<String>,
432    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
433        self.underlying().stderr_filter(prefix.into())
434    }
435}
436
437#[expect(missing_docs, reason = "TODO")]
438#[derive(Clone)]
439pub struct TrybuildHost {
440    host: Arc<dyn Host>,
441    display_name: Option<String>,
442    rustflags: Option<String>,
443    profile: Option<String>,
444    additional_hydro_features: Vec<String>,
445    features: Vec<String>,
446    tracing: Option<TracingOptions>,
447    build_envs: Vec<(String, String)>,
448    name_hint: Option<String>,
449    cluster_idx: Option<usize>,
450}
451
452impl From<Arc<dyn Host>> for TrybuildHost {
453    fn from(host: Arc<dyn Host>) -> Self {
454        Self {
455            host,
456            display_name: None,
457            rustflags: None,
458            profile: None,
459            additional_hydro_features: vec![],
460            features: vec![],
461            tracing: None,
462            build_envs: vec![],
463            name_hint: None,
464            cluster_idx: None,
465        }
466    }
467}
468
469impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
470    fn from(host: Arc<H>) -> Self {
471        Self {
472            host,
473            display_name: None,
474            rustflags: None,
475            profile: None,
476            additional_hydro_features: vec![],
477            features: vec![],
478            tracing: None,
479            build_envs: vec![],
480            name_hint: None,
481            cluster_idx: None,
482        }
483    }
484}
485
486#[expect(missing_docs, reason = "TODO")]
487impl TrybuildHost {
488    pub fn new(host: Arc<dyn Host>) -> Self {
489        Self {
490            host,
491            display_name: None,
492            rustflags: None,
493            profile: None,
494            additional_hydro_features: vec![],
495            features: vec![],
496            tracing: None,
497            build_envs: vec![],
498            name_hint: None,
499            cluster_idx: None,
500        }
501    }
502
503    pub fn display_name(self, display_name: impl Into<String>) -> Self {
504        if self.display_name.is_some() {
505            panic!("{} already set", name_of!(display_name in Self));
506        }
507
508        Self {
509            display_name: Some(display_name.into()),
510            ..self
511        }
512    }
513
514    pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
515        if self.rustflags.is_some() {
516            panic!("{} already set", name_of!(rustflags in Self));
517        }
518
519        Self {
520            rustflags: Some(rustflags.into()),
521            ..self
522        }
523    }
524
525    pub fn profile(self, profile: impl Into<String>) -> Self {
526        if self.profile.is_some() {
527            panic!("{} already set", name_of!(profile in Self));
528        }
529
530        Self {
531            profile: Some(profile.into()),
532            ..self
533        }
534    }
535
536    pub fn additional_hydro_features(self, additional_hydro_features: Vec<String>) -> Self {
537        Self {
538            additional_hydro_features,
539            ..self
540        }
541    }
542
543    pub fn features(self, features: Vec<String>) -> Self {
544        Self {
545            features: self.features.into_iter().chain(features).collect(),
546            ..self
547        }
548    }
549
550    pub fn tracing(self, tracing: TracingOptions) -> Self {
551        if self.tracing.is_some() {
552            panic!("{} already set", name_of!(tracing in Self));
553        }
554
555        Self {
556            tracing: Some(tracing),
557            ..self
558        }
559    }
560
561    pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
562        Self {
563            build_envs: self
564                .build_envs
565                .into_iter()
566                .chain(std::iter::once((key.into(), value.into())))
567                .collect(),
568            ..self
569        }
570    }
571}
572
573impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
574    type ProcessSpec = TrybuildHost;
575    fn into_process_spec(self) -> TrybuildHost {
576        TrybuildHost {
577            host: self,
578            display_name: None,
579            rustflags: None,
580            profile: None,
581            additional_hydro_features: vec![],
582            features: vec![],
583            tracing: None,
584            build_envs: vec![],
585            name_hint: None,
586            cluster_idx: None,
587        }
588    }
589}
590
591impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
592    type ProcessSpec = TrybuildHost;
593    fn into_process_spec(self) -> TrybuildHost {
594        TrybuildHost {
595            host: self,
596            display_name: None,
597            rustflags: None,
598            profile: None,
599            additional_hydro_features: vec![],
600            features: vec![],
601            tracing: None,
602            build_envs: vec![],
603            name_hint: None,
604            cluster_idx: None,
605        }
606    }
607}
608
609#[expect(missing_docs, reason = "TODO")]
610#[derive(Clone)]
611pub struct DeployExternal {
612    next_port: Rc<RefCell<usize>>,
613    host: Arc<dyn Host>,
614    underlying: Rc<RefCell<Option<Arc<CustomService>>>>,
615    client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
616    allocated_ports: Rc<RefCell<HashMap<ExternalPortId, String>>>,
617}
618
619impl DeployExternal {
620    pub(crate) fn raw_port(&self, external_port_id: ExternalPortId) -> CustomClientPort {
621        self.client_ports
622            .borrow()
623            .get(
624                self.allocated_ports
625                    .borrow()
626                    .get(&external_port_id)
627                    .unwrap(),
628            )
629            .unwrap()
630            .clone()
631    }
632}
633
634impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
635    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
636        assert!(
637            self.allocated_ports
638                .borrow_mut()
639                .insert(external_port_id, port.clone())
640                .is_none_or(|old| old == port)
641        );
642    }
643
644    fn as_bytes_bidi(
645        &self,
646        external_port_id: ExternalPortId,
647    ) -> impl Future<
648        Output = (
649            Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
650            Pin<Box<dyn Sink<Bytes, Error = Error>>>,
651        ),
652    > + 'a {
653        let port = self.raw_port(external_port_id);
654
655        async move {
656            let (source, sink) = port.connect().await.into_source_sink();
657            (
658                Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
659                Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
660            )
661        }
662    }
663
664    fn as_bincode_bidi<InT, OutT>(
665        &self,
666        external_port_id: ExternalPortId,
667    ) -> impl Future<
668        Output = (
669            Pin<Box<dyn Stream<Item = OutT>>>,
670            Pin<Box<dyn Sink<InT, Error = Error>>>,
671        ),
672    > + 'a
673    where
674        InT: Serialize + 'static,
675        OutT: DeserializeOwned + 'static,
676    {
677        let port = self.raw_port(external_port_id);
678        async move {
679            let (source, sink) = port.connect().await.into_source_sink();
680            (
681                Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
682                    as Pin<Box<dyn Stream<Item = OutT>>>,
683                Box::pin(
684                    sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
685                ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
686            )
687        }
688    }
689
690    fn as_bincode_sink<T: Serialize + 'static>(
691        &self,
692        external_port_id: ExternalPortId,
693    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
694        let port = self.raw_port(external_port_id);
695        async move {
696            let sink = port.connect().await.into_sink();
697            Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
698                as Pin<Box<dyn Sink<T, Error = Error>>>
699        }
700    }
701
702    fn as_bincode_source<T: DeserializeOwned + 'static>(
703        &self,
704        external_port_id: ExternalPortId,
705    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
706        let port = self.raw_port(external_port_id);
707        async move {
708            let source = port.connect().await.into_source();
709            Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
710                as Pin<Box<dyn Stream<Item = T>>>
711        }
712    }
713}
714
715impl Node for DeployExternal {
716    type Port = String;
717    /// Map from Cluster location ID to member IDs.
718    type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
719    type InstantiateEnv = Deployment;
720
721    fn next_port(&self) -> Self::Port {
722        let next_port = *self.next_port.borrow();
723        *self.next_port.borrow_mut() += 1;
724
725        format!("port_{}", next_port)
726    }
727
728    fn instantiate(
729        &self,
730        env: &mut Self::InstantiateEnv,
731        _meta: &mut Self::Meta,
732        _graph: DfirGraph,
733        extra_stmts: &[syn::Stmt],
734        sidecars: &[syn::Expr],
735    ) {
736        assert!(extra_stmts.is_empty());
737        assert!(sidecars.is_empty());
738        let service = env.CustomService(self.host.clone(), vec![]);
739        *self.underlying.borrow_mut() = Some(service);
740    }
741
742    fn update_meta(&self, _meta: &Self::Meta) {}
743}
744
745impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
746    fn build(self, _key: LocationKey, _name_hint: &str) -> DeployExternal {
747        DeployExternal {
748            next_port: Rc::new(RefCell::new(0)),
749            host: self,
750            underlying: Rc::new(RefCell::new(None)),
751            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
752            client_ports: Rc::new(RefCell::new(HashMap::new())),
753        }
754    }
755}
756
757impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
758    fn build(self, _key: LocationKey, _name_hint: &str) -> DeployExternal {
759        DeployExternal {
760            next_port: Rc::new(RefCell::new(0)),
761            host: self,
762            underlying: Rc::new(RefCell::new(None)),
763            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
764            client_ports: Rc::new(RefCell::new(HashMap::new())),
765        }
766    }
767}
768
769pub(crate) enum CrateOrTrybuild {
770    Crate(RustCrate, Arc<dyn Host>),
771    Trybuild(TrybuildHost),
772}
773
774#[expect(missing_docs, reason = "TODO")]
775#[derive(Clone)]
776pub struct DeployNode {
777    next_port: Rc<RefCell<usize>>,
778    service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
779    underlying: Rc<RefCell<Option<Arc<RustCrateService>>>>,
780}
781
782impl DeployCrateWrapper for DeployNode {
783    fn underlying(&self) -> Arc<RustCrateService> {
784        Arc::clone(self.underlying.borrow().as_ref().unwrap())
785    }
786}
787
788impl Node for DeployNode {
789    type Port = String;
790    /// Map from Cluster location ID to member IDs.
791    type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
792    type InstantiateEnv = Deployment;
793
794    fn next_port(&self) -> String {
795        let next_port = *self.next_port.borrow();
796        *self.next_port.borrow_mut() += 1;
797
798        format!("port_{}", next_port)
799    }
800
801    fn update_meta(&self, meta: &Self::Meta) {
802        let underlying_node = self.underlying.borrow();
803        underlying_node.as_ref().unwrap().update_meta(HydroMeta {
804            clusters: meta.clone(),
805            cluster_id: None,
806        });
807    }
808
809    fn instantiate(
810        &self,
811        env: &mut Self::InstantiateEnv,
812        _meta: &mut Self::Meta,
813        graph: DfirGraph,
814        extra_stmts: &[syn::Stmt],
815        sidecars: &[syn::Expr],
816    ) {
817        let (service, host) = match self.service_spec.borrow_mut().take().unwrap() {
818            CrateOrTrybuild::Crate(c, host) => (c, host),
819            CrateOrTrybuild::Trybuild(trybuild) => {
820                // Determine linking mode based on host target type
821                let linking_mode = if !cfg!(target_os = "windows")
822                    && trybuild.host.target_type() == hydro_deploy::HostTargetType::Local
823                {
824                    // When compiling for local, prefer dynamic linking to reduce binary size
825                    // Windows is currently not supported due to https://github.com/bevyengine/bevy/pull/2016
826                    LinkingMode::Dynamic
827                } else {
828                    LinkingMode::Static
829                };
830                let (bin_name, config) = create_graph_trybuild(
831                    graph,
832                    extra_stmts,
833                    sidecars,
834                    trybuild.name_hint.as_deref(),
835                    crate::compile::trybuild::generate::DeployMode::HydroDeploy,
836                    linking_mode,
837                );
838                let host = trybuild.host.clone();
839                (
840                    create_trybuild_service(
841                        trybuild,
842                        &config.project_dir,
843                        &config.target_dir,
844                        config.features.as_deref(),
845                        &bin_name,
846                        &config.linking_mode,
847                    ),
848                    host,
849                )
850            }
851        };
852
853        *self.underlying.borrow_mut() = Some(env.add_service(service, host));
854    }
855}
856
857#[expect(missing_docs, reason = "TODO")]
858#[derive(Clone)]
859pub struct DeployClusterNode {
860    underlying: Arc<RustCrateService>,
861}
862
863impl DeployCrateWrapper for DeployClusterNode {
864    fn underlying(&self) -> Arc<RustCrateService> {
865        self.underlying.clone()
866    }
867}
868#[expect(missing_docs, reason = "TODO")]
869#[derive(Clone)]
870pub struct DeployCluster {
871    key: LocationKey,
872    next_port: Rc<RefCell<usize>>,
873    cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
874    members: Rc<RefCell<Vec<DeployClusterNode>>>,
875    name_hint: Option<String>,
876}
877
878impl DeployCluster {
879    #[expect(missing_docs, reason = "TODO")]
880    pub fn members(&self) -> Vec<DeployClusterNode> {
881        self.members.borrow().clone()
882    }
883}
884
885impl Node for DeployCluster {
886    type Port = String;
887    /// Map from Cluster location ID to member IDs.
888    type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
889    type InstantiateEnv = Deployment;
890
891    fn next_port(&self) -> String {
892        let next_port = *self.next_port.borrow();
893        *self.next_port.borrow_mut() += 1;
894
895        format!("port_{}", next_port)
896    }
897
898    fn instantiate(
899        &self,
900        env: &mut Self::InstantiateEnv,
901        meta: &mut Self::Meta,
902        graph: DfirGraph,
903        extra_stmts: &[syn::Stmt],
904        sidecars: &[syn::Expr],
905    ) {
906        let has_trybuild = self
907            .cluster_spec
908            .borrow()
909            .as_ref()
910            .unwrap()
911            .iter()
912            .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
913
914        // For clusters, use static linking if ANY host is non-local (conservative approach)
915        let linking_mode = if !cfg!(target_os = "windows")
916            && self
917                .cluster_spec
918                .borrow()
919                .as_ref()
920                .unwrap()
921                .iter()
922                .all(|spec| match spec {
923                    CrateOrTrybuild::Crate(_, _) => true, // crates handle their own linking
924                    CrateOrTrybuild::Trybuild(t) => {
925                        t.host.target_type() == hydro_deploy::HostTargetType::Local
926                    }
927                }) {
928            // See comment above for Windows exception
929            LinkingMode::Dynamic
930        } else {
931            LinkingMode::Static
932        };
933
934        let maybe_trybuild = if has_trybuild {
935            Some(create_graph_trybuild(
936                graph,
937                extra_stmts,
938                sidecars,
939                self.name_hint.as_deref(),
940                crate::compile::trybuild::generate::DeployMode::HydroDeploy,
941                linking_mode,
942            ))
943        } else {
944            None
945        };
946
947        let cluster_nodes = self
948            .cluster_spec
949            .borrow_mut()
950            .take()
951            .unwrap()
952            .into_iter()
953            .map(|spec| {
954                let (service, host) = match spec {
955                    CrateOrTrybuild::Crate(c, host) => (c, host),
956                    CrateOrTrybuild::Trybuild(trybuild) => {
957                        let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
958                        let host = trybuild.host.clone();
959                        (
960                            create_trybuild_service(
961                                trybuild,
962                                &config.project_dir,
963                                &config.target_dir,
964                                config.features.as_deref(),
965                                bin_name,
966                                &config.linking_mode,
967                            ),
968                            host,
969                        )
970                    }
971                };
972
973                env.add_service(service, host)
974            })
975            .collect::<Vec<_>>();
976        meta.insert(
977            self.key,
978            (0..(cluster_nodes.len() as u32))
979                .map(TaglessMemberId::from_raw_id)
980                .collect(),
981        );
982        *self.members.borrow_mut() = cluster_nodes
983            .into_iter()
984            .map(|n| DeployClusterNode { underlying: n })
985            .collect();
986    }
987
988    fn update_meta(&self, meta: &Self::Meta) {
989        for (cluster_id, node) in self.members.borrow().iter().enumerate() {
990            node.underlying.update_meta(HydroMeta {
991                clusters: meta.clone(),
992                cluster_id: Some(TaglessMemberId::from_raw_id(cluster_id as u32)),
993            });
994        }
995    }
996}
997
998#[expect(missing_docs, reason = "TODO")]
999#[derive(Clone)]
1000pub struct DeployProcessSpec(RustCrate, Arc<dyn Host>);
1001
1002impl DeployProcessSpec {
1003    #[expect(missing_docs, reason = "TODO")]
1004    pub fn new(t: RustCrate, host: Arc<dyn Host>) -> Self {
1005        Self(t, host)
1006    }
1007}
1008
1009impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
1010    fn build(self, _key: LocationKey, _name_hint: &str) -> DeployNode {
1011        DeployNode {
1012            next_port: Rc::new(RefCell::new(0)),
1013            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0, self.1)))),
1014            underlying: Rc::new(RefCell::new(None)),
1015        }
1016    }
1017}
1018
1019impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
1020    fn build(mut self, key: LocationKey, name_hint: &str) -> DeployNode {
1021        self.name_hint = Some(format!("{} (process {})", name_hint, key));
1022        DeployNode {
1023            next_port: Rc::new(RefCell::new(0)),
1024            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
1025            underlying: Rc::new(RefCell::new(None)),
1026        }
1027    }
1028}
1029
1030#[expect(missing_docs, reason = "TODO")]
1031#[derive(Clone)]
1032pub struct DeployClusterSpec(Vec<(RustCrate, Arc<dyn Host>)>);
1033
1034impl DeployClusterSpec {
1035    #[expect(missing_docs, reason = "TODO")]
1036    pub fn new(crates: Vec<(RustCrate, Arc<dyn Host>)>) -> Self {
1037        Self(crates)
1038    }
1039}
1040
1041impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
1042    fn build(self, key: LocationKey, _name_hint: &str) -> DeployCluster {
1043        DeployCluster {
1044            key,
1045            next_port: Rc::new(RefCell::new(0)),
1046            cluster_spec: Rc::new(RefCell::new(Some(
1047                self.0
1048                    .into_iter()
1049                    .map(|(c, h)| CrateOrTrybuild::Crate(c, h))
1050                    .collect(),
1051            ))),
1052            members: Rc::new(RefCell::new(vec![])),
1053            name_hint: None,
1054        }
1055    }
1056}
1057
1058impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1059    fn build(self, key: LocationKey, name_hint: &str) -> DeployCluster {
1060        let name_hint = format!("{} (cluster {})", name_hint, key);
1061        DeployCluster {
1062            key,
1063            next_port: Rc::new(RefCell::new(0)),
1064            cluster_spec: Rc::new(RefCell::new(Some(
1065                self.into_iter()
1066                    .enumerate()
1067                    .map(|(idx, b)| {
1068                        let mut b = b.into();
1069                        b.name_hint = Some(name_hint.clone());
1070                        b.cluster_idx = Some(idx);
1071                        CrateOrTrybuild::Trybuild(b)
1072                    })
1073                    .collect(),
1074            ))),
1075            members: Rc::new(RefCell::new(vec![])),
1076            name_hint: Some(name_hint),
1077        }
1078    }
1079}
1080
1081fn create_trybuild_service(
1082    trybuild: TrybuildHost,
1083    dir: &std::path::Path,
1084    target_dir: &std::path::PathBuf,
1085    features: Option<&[String]>,
1086    bin_name: &str,
1087    linking_mode: &LinkingMode,
1088) -> RustCrate {
1089    // For dynamic linking, use the dylib-examples crate; for static, use the base crate
1090    let crate_dir = match linking_mode {
1091        LinkingMode::Dynamic => dir.join("dylib-examples"),
1092        LinkingMode::Static => dir.to_path_buf(),
1093    };
1094
1095    let mut ret = RustCrate::new(&crate_dir, dir)
1096        .target_dir(target_dir)
1097        .example(bin_name)
1098        .no_default_features();
1099
1100    ret = ret.set_is_dylib(matches!(linking_mode, LinkingMode::Dynamic));
1101
1102    if let Some(display_name) = trybuild.display_name {
1103        ret = ret.display_name(display_name);
1104    } else if let Some(name_hint) = trybuild.name_hint {
1105        if let Some(cluster_idx) = trybuild.cluster_idx {
1106            ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1107        } else {
1108            ret = ret.display_name(name_hint);
1109        }
1110    }
1111
1112    if let Some(rustflags) = trybuild.rustflags {
1113        ret = ret.rustflags(rustflags);
1114    }
1115
1116    if let Some(profile) = trybuild.profile {
1117        ret = ret.profile(profile);
1118    }
1119
1120    if let Some(tracing) = trybuild.tracing {
1121        ret = ret.tracing(tracing);
1122    }
1123
1124    ret = ret.features(
1125        vec!["hydro___feature_deploy_integration".to_owned()]
1126            .into_iter()
1127            .chain(
1128                trybuild
1129                    .additional_hydro_features
1130                    .into_iter()
1131                    .map(|runtime_feature| {
1132                        assert!(
1133                            HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1134                            "{runtime_feature} is not a valid Hydro runtime feature"
1135                        );
1136                        format!("hydro___feature_{runtime_feature}")
1137                    }),
1138            )
1139            .chain(trybuild.features),
1140    );
1141
1142    for (key, value) in trybuild.build_envs {
1143        ret = ret.build_env(key, value);
1144    }
1145
1146    ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1147    ret = ret.config("build.incremental = false");
1148
1149    if let Some(features) = features {
1150        ret = ret.features(features);
1151    }
1152
1153    ret
1154}