Skip to main content

hydro_lang/deploy/
deploy_graph_containerized.rs

1//! Deployment backend for Hydro that uses Docker to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11    BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12    RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
20use hydro_deploy::{LinuxCompileType, RustCrate};
21use nanoid::nanoid;
22use proc_macro2::Span;
23use sinktools::lazy::LazySink;
24use stageleft::QuotedWithContext;
25use syn::parse_quote;
26use tar::{Builder, Header};
27use tokio::net::TcpStream;
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{Instrument, instrument, trace, warn};
30
31use super::deploy_runtime_containerized::*;
32use crate::compile::builder::ExternalPortId;
33use crate::compile::deploy::DeployResult;
34use crate::compile::deploy_provider::{
35    ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
36};
37use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
38use crate::location::dynamic::LocationId;
39use crate::location::member_id::TaglessMemberId;
40use crate::location::{LocationKey, MembershipEvent, NetworkHint};
41
42/// represents a docker network
43#[derive(Clone, Debug)]
44pub struct DockerNetwork {
45    name: String,
46}
47
48impl DockerNetwork {
49    /// creates a new docker network (will actually be created when deployment.start() is called).
50    pub fn new(name: String) -> Self {
51        Self {
52            name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
53        }
54    }
55}
56
57/// Represents a process running in a docker container
58#[derive(Clone)]
59pub struct DockerDeployProcess {
60    key: LocationKey,
61    name: String,
62    next_port: Rc<RefCell<u16>>,
63    rust_crate: Rc<RefCell<Option<RustCrate>>>,
64
65    exposed_ports: Rc<RefCell<Vec<u16>>>,
66
67    docker_container_name: Rc<RefCell<Option<String>>>,
68
69    compilation_options: Option<String>,
70
71    config: Vec<String>,
72
73    network: DockerNetwork,
74}
75
76impl Node for DockerDeployProcess {
77    type Port = u16;
78    type Meta = ();
79    type InstantiateEnv = DockerDeploy;
80
81    #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
82    fn next_port(&self) -> Self::Port {
83        let port = {
84            let mut borrow = self.next_port.borrow_mut();
85            let port = *borrow;
86            *borrow += 1;
87            port
88        };
89
90        port
91    }
92
93    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
94    fn update_meta(&self, _meta: &Self::Meta) {}
95
96    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
97    fn instantiate(
98        &self,
99        _env: &mut Self::InstantiateEnv,
100        meta: &mut Self::Meta,
101        graph: DfirGraph,
102        extra_stmts: &[syn::Stmt],
103        sidecars: &[syn::Expr],
104    ) {
105        let (bin_name, config) = create_graph_trybuild(
106            graph,
107            extra_stmts,
108            sidecars,
109            Some(&self.name),
110            crate::compile::trybuild::generate::DeployMode::Containerized,
111            LinkingMode::Static,
112        );
113
114        let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
115            .target_dir(config.target_dir)
116            .example(bin_name)
117            .no_default_features();
118
119        ret = ret.display_name("test_display_name");
120
121        ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
122
123        if let Some(features) = config.features {
124            ret = ret.features(features);
125        }
126
127        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
128        ret = ret.config("build.incremental = false");
129
130        *self.rust_crate.borrow_mut() = Some(ret);
131    }
132}
133
134/// Represents a logical cluster, which can be a variable amount of individual containers.
135#[derive(Clone)]
136pub struct DockerDeployCluster {
137    key: LocationKey,
138    name: String,
139    next_port: Rc<RefCell<u16>>,
140    rust_crate: Rc<RefCell<Option<RustCrate>>>,
141
142    docker_container_name: Rc<RefCell<Vec<String>>>,
143
144    compilation_options: Option<String>,
145
146    config: Vec<String>,
147
148    count: usize,
149}
150
151impl Node for DockerDeployCluster {
152    type Port = u16;
153    type Meta = ();
154    type InstantiateEnv = DockerDeploy;
155
156    #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
157    fn next_port(&self) -> Self::Port {
158        let port = {
159            let mut borrow = self.next_port.borrow_mut();
160            let port = *borrow;
161            *borrow += 1;
162            port
163        };
164
165        port
166    }
167
168    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
169    fn update_meta(&self, _meta: &Self::Meta) {}
170
171    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, extra_stmts = extra_stmts.len()))]
172    fn instantiate(
173        &self,
174        _env: &mut Self::InstantiateEnv,
175        _meta: &mut Self::Meta,
176        graph: DfirGraph,
177        extra_stmts: &[syn::Stmt],
178        sidecars: &[syn::Expr],
179    ) {
180        let (bin_name, config) = create_graph_trybuild(
181            graph,
182            extra_stmts,
183            sidecars,
184            Some(&self.name),
185            crate::compile::trybuild::generate::DeployMode::Containerized,
186            LinkingMode::Static,
187        );
188
189        let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
190            .target_dir(config.target_dir)
191            .example(bin_name)
192            .no_default_features();
193
194        ret = ret.display_name("test_display_name");
195
196        ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
197
198        if let Some(features) = config.features {
199            ret = ret.features(features);
200        }
201
202        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
203        ret = ret.config("build.incremental = false");
204
205        *self.rust_crate.borrow_mut() = Some(ret);
206    }
207}
208
209/// Represents an external process, outside the control of this deployment but still with some communication into this deployment.
210#[derive(Clone, Debug)]
211pub struct DockerDeployExternal {
212    name: String,
213    next_port: Rc<RefCell<u16>>,
214
215    ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
216
217    #[expect(clippy::type_complexity, reason = "internal code")]
218    connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
219}
220
221impl Node for DockerDeployExternal {
222    type Port = u16;
223    type Meta = ();
224    type InstantiateEnv = DockerDeploy;
225
226    #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
227    fn next_port(&self) -> Self::Port {
228        let port = {
229            let mut borrow = self.next_port.borrow_mut();
230            let port = *borrow;
231            *borrow += 1;
232            port
233        };
234
235        port
236    }
237
238    #[instrument(level = "trace", skip_all, fields(name = self.name))]
239    fn update_meta(&self, _meta: &Self::Meta) {}
240
241    #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
242    fn instantiate(
243        &self,
244        _env: &mut Self::InstantiateEnv,
245        meta: &mut Self::Meta,
246        graph: DfirGraph,
247        extra_stmts: &[syn::Stmt],
248        sidecars: &[syn::Expr],
249    ) {
250        trace!(name: "surface", surface = graph.surface_syntax_string());
251    }
252}
253
254type DynSourceSink<Out, In, InErr> = (
255    Pin<Box<dyn Stream<Item = Out>>>,
256    Pin<Box<dyn Sink<In, Error = InErr>>>,
257);
258
259impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
260    #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
261    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
262        self.ports.borrow_mut().insert(external_port_id, port);
263    }
264
265    fn as_bytes_bidi(
266        &self,
267        external_port_id: ExternalPortId,
268    ) -> impl Future<
269        Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
270    > + 'a {
271        let guard =
272            tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered();
273
274        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
275        let (docker_container_name, remote_port, _) = self
276            .connection_info
277            .borrow()
278            .get(&local_port)
279            .unwrap()
280            .clone();
281
282        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
283
284        async move {
285            let local_port =
286                find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
287            let remote_ip_address = "localhost";
288
289            trace!(name: "as_bytes_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
290
291            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
292                .await
293                .unwrap();
294
295            trace!(name: "as_bytes_bidi_connected", to = %remote_ip_address, to_port = %local_port);
296
297            let (rx, tx) = stream.into_split();
298
299            let source = Box::pin(
300                FramedRead::new(rx, LengthDelimitedCodec::new()),
301            ) as Pin<Box<dyn Stream<Item = Result<bytes::BytesMut, std::io::Error>>>>;
302
303            let sink = Box::pin(FramedWrite::new(tx, LengthDelimitedCodec::new()))
304                as Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>;
305
306            (source, sink)
307        }
308        .instrument(guard.exit())
309    }
310
311    fn as_bincode_bidi<InT, OutT>(
312        &self,
313        external_port_id: ExternalPortId,
314    ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
315    where
316        InT: serde::Serialize + 'static,
317        OutT: serde::de::DeserializeOwned + 'static,
318    {
319        let guard =
320            tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered();
321
322        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
323        let (docker_container_name, remote_port, _) = self
324            .connection_info
325            .borrow()
326            .get(&local_port)
327            .unwrap()
328            .clone();
329
330        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
331
332        async move {
333            let local_port =
334                find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
335            let remote_ip_address = "localhost";
336
337            trace!(name: "as_bincode_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
338
339            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
340                .await
341                .unwrap();
342
343            trace!(name: "as_bincode_bidi_connected", to = %remote_ip_address, to_port = %local_port);
344
345            let (rx, tx) = stream.into_split();
346
347            let source = Box::pin(
348                FramedRead::new(rx, LengthDelimitedCodec::new())
349                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
350            ) as Pin<Box<dyn Stream<Item = OutT>>>;
351
352            let sink = Box::pin(
353                FramedWrite::new(tx, LengthDelimitedCodec::new()).with(move |v: InT| async move {
354                    Ok::<_, std::io::Error>(Bytes::from(bincode::serialize(&v).unwrap()))
355                }),
356            ) as Pin<Box<dyn Sink<InT, Error = std::io::Error>>>;
357
358            (source, sink)
359        }
360        .instrument(guard.exit())
361    }
362
363    fn as_bincode_sink<T>(
364        &self,
365        external_port_id: ExternalPortId,
366    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
367    where
368        T: serde::Serialize + 'static,
369    {
370        let guard =
371            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
372
373        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
374        let (docker_container_name, remote_port, _) = self
375            .connection_info
376            .borrow()
377            .get(&local_port)
378            .unwrap()
379            .clone();
380
381        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
382
383        async move {
384            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
385            let remote_ip_address = "localhost";
386
387            Box::pin(
388                LazySink::new(move || {
389                    Box::pin(async move {
390                        trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
391
392                        let stream =
393                            TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
394                                .await?;
395
396                        trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
397
398                        Result::<_, std::io::Error>::Ok(FramedWrite::new(
399                            stream,
400                            LengthDelimitedCodec::new(),
401                        ))
402                    })
403                })
404                .with(move |v| async move {
405                    Ok(Bytes::from(bincode::serialize(&v).unwrap()))
406                }),
407            ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
408        }
409        .instrument(guard.exit())
410    }
411
412    fn as_bincode_source<T>(
413        &self,
414        external_port_id: ExternalPortId,
415    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
416    where
417        T: serde::de::DeserializeOwned + 'static,
418    {
419        let guard =
420            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
421
422        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
423        let (docker_container_name, remote_port, _) = self
424            .connection_info
425            .borrow()
426            .get(&local_port)
427            .unwrap()
428            .clone();
429
430        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
431
432        async move {
433
434            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
435            let remote_ip_address = "localhost";
436
437            trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
438
439            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
440                .await
441                .unwrap();
442
443            trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
444
445            Box::pin(
446                FramedRead::new(stream, LengthDelimitedCodec::new())
447                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
448            ) as Pin<Box<dyn Stream<Item = T>>>
449        }
450        .instrument(guard.exit())
451    }
452}
453
454#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
455async fn find_dynamically_allocated_docker_port(
456    docker_container_name: &str,
457    destination_port: u16,
458) -> u16 {
459    let docker = Docker::connect_with_local_defaults().unwrap();
460
461    let container_info = docker
462        .inspect_container(docker_container_name, None::<InspectContainerOptions>)
463        .await
464        .unwrap();
465
466    trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
467
468    // container_info={"1001/tcp": Some([PortBinding { host_ip: Some("0.0.0.0"), host_port: Some("32771") }, PortBinding { host_ip: Some("::"), host_port: Some("32771") }])} destination_port=1001
469    let remote_port = container_info
470        .network_settings
471        .as_ref()
472        .unwrap()
473        .ports
474        .as_ref()
475        .unwrap()
476        .get(&format!("{destination_port}/tcp"))
477        .unwrap()
478        .as_ref()
479        .unwrap()
480        .iter()
481        .find(|v| v.host_ip == Some("0.0.0.0".to_owned()))
482        .unwrap()
483        .host_port
484        .as_ref()
485        .unwrap()
486        .parse()
487        .unwrap();
488
489    remote_port
490}
491
492/// For deploying to a local docker instance
493pub struct DockerDeploy {
494    docker_processes: Vec<DockerDeployProcessSpec>,
495    docker_clusters: Vec<DockerDeployClusterSpec>,
496    network: DockerNetwork,
497    deployment_instance: String,
498}
499
500#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
501async fn create_and_start_container(
502    docker: &Docker,
503    container_name: &str,
504    image_name: &str,
505    network_name: &str,
506    deployment_instance: &str,
507) -> Result<(), anyhow::Error> {
508    let config = ContainerCreateBody {
509        image: Some(image_name.to_owned()),
510        hostname: Some(container_name.to_owned()),
511        host_config: Some(HostConfig {
512            binds: Some(vec!["/var/run/docker.sock:/var/run/docker.sock".to_owned()]),
513            publish_all_ports: Some(true),
514            port_bindings: Some(HashMap::new()), /* Due to a bug in docker, if you don't send empty port bindings with publish_all_ports set to true and with a docker image that has EXPOSE directives in it, docker will crash because it will try to write to a map in memory that it has not initialized yet. Setting port_bindings explicitly to an empty map will initialize it first so that it does not break. */
515            ..Default::default()
516        }),
517        env: Some(vec![
518            format!("CONTAINER_NAME={container_name}"),
519            format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
520            format!("RUST_LOG=trace"),
521        ]),
522        networking_config: Some(NetworkingConfig {
523            endpoints_config: Some(HashMap::from([(
524                network_name.to_owned(),
525                EndpointSettings {
526                    ..Default::default()
527                },
528            )])),
529        }),
530        tty: Some(true),
531        ..Default::default()
532    };
533
534    let options = CreateContainerOptions {
535        name: Some(container_name.to_owned()),
536        ..Default::default()
537    };
538
539    tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
540    docker.create_container(Some(options), config).await?;
541    docker
542        .start_container(container_name, None::<StartContainerOptions>)
543        .await?;
544
545    Ok(())
546}
547
548#[instrument(level = "trace", skip_all, fields(%image_name))]
549async fn build_and_create_image(
550    rust_crate: &Rc<RefCell<Option<RustCrate>>>,
551    compilation_options: Option<&str>,
552    config: &[String],
553    exposed_ports: &[u16],
554    image_name: &str,
555) -> Result<(), anyhow::Error> {
556    let mut rust_crate = rust_crate
557        .borrow_mut()
558        .take()
559        .unwrap()
560        .rustflags(compilation_options.unwrap_or_default());
561
562    for cfg in config {
563        rust_crate = rust_crate.config(cfg);
564    }
565
566    let build_output = match build_crate_memoized(
567        rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(LinuxCompileType::Musl)),
568    )
569    .await
570    {
571        Ok(build_output) => build_output,
572        Err(BuildError::FailedToBuildCrate {
573            exit_status,
574            diagnostics,
575            text_lines,
576            stderr_lines,
577        }) => {
578            let diagnostics = diagnostics
579                .into_iter()
580                .map(|d| d.rendered.unwrap())
581                .collect::<Vec<_>>()
582                .join("\n");
583            let text_lines = text_lines.join("\n");
584            let stderr_lines = stderr_lines.join("\n");
585
586            anyhow::bail!(
587                r#"
588Failed to build crate {exit_status:?}
589--- diagnostics
590---
591{diagnostics}
592---
593---
594---
595
596--- text_lines
597---
598---
599{text_lines}
600---
601---
602---
603
604--- stderr_lines
605---
606---
607{stderr_lines}
608---
609---
610---"#
611            );
612        }
613        Err(err) => {
614            anyhow::bail!("Failed to build crate {err:?}");
615        }
616    };
617
618    let docker = Docker::connect_with_local_defaults()?;
619
620    let mut tar_data = Vec::new();
621    {
622        let mut tar = Builder::new(&mut tar_data);
623
624        let exposed_ports = exposed_ports
625            .iter()
626            .map(|port| format!("EXPOSE {port}/tcp"))
627            .collect::<Vec<_>>()
628            .join("\n");
629
630        let dockerfile_content = format!(
631            r#"
632                FROM scratch
633                {exposed_ports}
634                COPY app /app
635                CMD ["/app"]
636            "#,
637        );
638
639        trace!(name: "dockerfile", %dockerfile_content);
640
641        let mut header = Header::new_gnu();
642        header.set_path("Dockerfile")?;
643        header.set_size(dockerfile_content.len() as u64);
644        header.set_cksum();
645        tar.append(&header, dockerfile_content.as_bytes())?;
646
647        let mut header = Header::new_gnu();
648        header.set_path("app")?;
649        header.set_size(build_output.bin_data.len() as u64);
650        header.set_mode(0o755);
651        header.set_cksum();
652        tar.append(&header, &build_output.bin_data[..])?;
653
654        tar.finish()?;
655    }
656
657    let build_options = BuildImageOptions {
658        dockerfile: "Dockerfile".to_owned(),
659        t: Some(image_name.to_owned()),
660        rm: true,
661        ..Default::default()
662    };
663
664    use bollard::errors::Error;
665
666    let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
667    let mut build_stream = docker.build_image(build_options, None, Some(body));
668    while let Some(msg) = build_stream.next().await {
669        match msg {
670            Ok(_) => {}
671            Err(e) => match e {
672                Error::DockerStreamError { error } => {
673                    return Err(anyhow::anyhow!(
674                        "Docker build failed: DockerStreamError: {{ error: {error} }}"
675                    ));
676                }
677                _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
678            },
679        }
680    }
681
682    Ok(())
683}
684
685impl DockerDeploy {
686    /// Create a new deployment
687    pub fn new(network: DockerNetwork) -> Self {
688        Self {
689            docker_processes: Vec::new(),
690            docker_clusters: Vec::new(),
691            network,
692            deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
693        }
694    }
695
696    /// Add an internal docker service to the deployment.
697    pub fn add_localhost_docker(
698        &mut self,
699        compilation_options: Option<String>,
700        config: Vec<String>,
701    ) -> DockerDeployProcessSpec {
702        let process = DockerDeployProcessSpec {
703            compilation_options,
704            config,
705            network: self.network.clone(),
706            deployment_instance: self.deployment_instance.clone(),
707        };
708
709        self.docker_processes.push(process.clone());
710
711        process
712    }
713
714    /// Add an internal docker cluster to the deployment.
715    pub fn add_localhost_docker_cluster(
716        &mut self,
717        compilation_options: Option<String>,
718        config: Vec<String>,
719        count: usize,
720    ) -> DockerDeployClusterSpec {
721        let cluster = DockerDeployClusterSpec {
722            compilation_options,
723            config,
724            count,
725            deployment_instance: self.deployment_instance.clone(),
726        };
727
728        self.docker_clusters.push(cluster.clone());
729
730        cluster
731    }
732
733    /// Add an external process to the deployment.
734    pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
735        DockerDeployExternalSpec { name }
736    }
737
738    /// Get the deployment instance from this deployment.
739    pub fn get_deployment_instance(&self) -> String {
740        self.deployment_instance.clone()
741    }
742
743    /// Create docker images.
744    #[instrument(level = "trace", skip_all)]
745    pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
746        for (_, _, process) in nodes.get_all_processes() {
747            let exposed_ports = process.exposed_ports.borrow().clone();
748
749            build_and_create_image(
750                &process.rust_crate,
751                process.compilation_options.as_deref(),
752                &process.config,
753                &exposed_ports,
754                &process.name,
755            )
756            .await?;
757        }
758
759        for (_, _, cluster) in nodes.get_all_clusters() {
760            build_and_create_image(
761                &cluster.rust_crate,
762                cluster.compilation_options.as_deref(),
763                &cluster.config,
764                &[], // clusters don't have exposed ports.
765                &cluster.name,
766            )
767            .await?;
768        }
769
770        Ok(())
771    }
772
773    /// Start the deployment, tell docker to create containers from the existing provisioned images.
774    #[instrument(level = "trace", skip_all)]
775    pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
776        let docker = Docker::connect_with_local_defaults()?;
777
778        match docker
779            .create_network(NetworkCreateRequest {
780                name: self.network.name.clone(),
781                driver: Some("bridge".to_owned()),
782                ..Default::default()
783            })
784            .await
785        {
786            Ok(v) => v.id,
787            Err(e) => {
788                panic!("Failed to create docker network: {e:?}");
789            }
790        };
791
792        for (_, _, process) in nodes.get_all_processes() {
793            let docker_container_name: String = get_docker_container_name(&process.name, None);
794            *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
795
796            create_and_start_container(
797                &docker,
798                &docker_container_name,
799                &process.name,
800                &self.network.name,
801                &self.deployment_instance,
802            )
803            .await?;
804        }
805
806        for (_, _, cluster) in nodes.get_all_clusters() {
807            for num in 0..cluster.count {
808                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
809                cluster
810                    .docker_container_name
811                    .borrow_mut()
812                    .push(docker_container_name.clone());
813
814                create_and_start_container(
815                    &docker,
816                    &docker_container_name,
817                    &cluster.name,
818                    &self.network.name,
819                    &self.deployment_instance,
820                )
821                .await?;
822            }
823        }
824
825        Ok(())
826    }
827
828    /// Stop the deployment, destroy all containers
829    #[instrument(level = "trace", skip_all)]
830    pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
831        let docker = Docker::connect_with_local_defaults()?;
832
833        for (_, _, process) in nodes.get_all_processes() {
834            let docker_container_name: String = get_docker_container_name(&process.name, None);
835
836            docker
837                .kill_container(&docker_container_name, None::<KillContainerOptions>)
838                .await?;
839        }
840
841        for (_, _, cluster) in nodes.get_all_clusters() {
842            for num in 0..cluster.count {
843                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
844
845                docker
846                    .kill_container(&docker_container_name, None::<KillContainerOptions>)
847                    .await?;
848            }
849        }
850
851        Ok(())
852    }
853
854    /// remove containers, images, and networks.
855    #[instrument(level = "trace", skip_all)]
856    pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
857        let docker = Docker::connect_with_local_defaults()?;
858
859        for (_, _, process) in nodes.get_all_processes() {
860            let docker_container_name: String = get_docker_container_name(&process.name, None);
861
862            docker
863                .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
864                .await?;
865        }
866
867        for (_, _, cluster) in nodes.get_all_clusters() {
868            for num in 0..cluster.count {
869                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
870
871                docker
872                    .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
873                    .await?;
874            }
875        }
876
877        docker
878            .remove_network(&self.network.name)
879            .await
880            .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
881
882        use bollard::query_parameters::RemoveImageOptions;
883
884        for (_, _, process) in nodes.get_all_processes() {
885            docker
886                .remove_image(&process.name, None::<RemoveImageOptions>, None)
887                .await?;
888        }
889
890        for (_, _, cluster) in nodes.get_all_clusters() {
891            docker
892                .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
893                .await?;
894        }
895
896        Ok(())
897    }
898}
899
900impl<'a> Deploy<'a> for DockerDeploy {
901    type Meta = ();
902    type InstantiateEnv = Self;
903
904    type Process = DockerDeployProcess;
905    type Cluster = DockerDeployCluster;
906    type External = DockerDeployExternal;
907
908    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
909    fn o2o_sink_source(
910        _env: &mut Self::InstantiateEnv,
911        p1: &Self::Process,
912        p1_port: &<Self::Process as Node>::Port,
913        p2: &Self::Process,
914        p2_port: &<Self::Process as Node>::Port,
915        _name: Option<&str>,
916    ) -> (syn::Expr, syn::Expr) {
917        let bind_addr = format!("0.0.0.0:{}", p2_port);
918        let target = format!("{}:{p2_port}", p2.name);
919
920        deploy_containerized_o2o(target.as_str(), bind_addr.as_str())
921    }
922
923    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
924    fn o2o_connect(
925        p1: &Self::Process,
926        p1_port: &<Self::Process as Node>::Port,
927        p2: &Self::Process,
928        p2_port: &<Self::Process as Node>::Port,
929    ) -> Box<dyn FnOnce()> {
930        let serialized = format!("o2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
931
932        Box::new(move || {
933            trace!(name: "o2o_connect thunk", %serialized);
934        })
935    }
936
937    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
938    fn o2m_sink_source(
939        p1: &Self::Process,
940        p1_port: &<Self::Process as Node>::Port,
941        c2: &Self::Cluster,
942        c2_port: &<Self::Cluster as Node>::Port,
943    ) -> (syn::Expr, syn::Expr) {
944        deploy_containerized_o2m(*c2_port)
945    }
946
947    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
948    fn o2m_connect(
949        p1: &Self::Process,
950        p1_port: &<Self::Process as Node>::Port,
951        c2: &Self::Cluster,
952        c2_port: &<Self::Cluster as Node>::Port,
953    ) -> Box<dyn FnOnce()> {
954        let serialized = format!("o2m_connect {}:{p1_port} -> {}:{c2_port}", p1.name, c2.name);
955
956        Box::new(move || {
957            trace!(name: "o2m_connect thunk", %serialized);
958        })
959    }
960
961    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
962    fn m2o_sink_source(
963        c1: &Self::Cluster,
964        c1_port: &<Self::Cluster as Node>::Port,
965        p2: &Self::Process,
966        p2_port: &<Self::Process as Node>::Port,
967    ) -> (syn::Expr, syn::Expr) {
968        deploy_containerized_m2o(*p2_port, &p2.name)
969    }
970
971    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
972    fn m2o_connect(
973        c1: &Self::Cluster,
974        c1_port: &<Self::Cluster as Node>::Port,
975        p2: &Self::Process,
976        p2_port: &<Self::Process as Node>::Port,
977    ) -> Box<dyn FnOnce()> {
978        let serialized = format!("o2m_connect {}:{c1_port} -> {}:{p2_port}", c1.name, p2.name);
979
980        Box::new(move || {
981            trace!(name: "m2o_connect thunk", %serialized);
982        })
983    }
984
985    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
986    fn m2m_sink_source(
987        c1: &Self::Cluster,
988        c1_port: &<Self::Cluster as Node>::Port,
989        c2: &Self::Cluster,
990        c2_port: &<Self::Cluster as Node>::Port,
991    ) -> (syn::Expr, syn::Expr) {
992        deploy_containerized_m2m(*c2_port)
993    }
994
995    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
996    fn m2m_connect(
997        c1: &Self::Cluster,
998        c1_port: &<Self::Cluster as Node>::Port,
999        c2: &Self::Cluster,
1000        c2_port: &<Self::Cluster as Node>::Port,
1001    ) -> Box<dyn FnOnce()> {
1002        let serialized = format!("m2m_connect {}:{c1_port} -> {}:{c2_port}", c1.name, c2.name);
1003
1004        Box::new(move || {
1005            trace!(name: "m2m_connect thunk", %serialized);
1006        })
1007    }
1008
1009    #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
1010    fn e2o_many_source(
1011        extra_stmts: &mut Vec<syn::Stmt>,
1012        p2: &Self::Process,
1013        p2_port: &<Self::Process as Node>::Port,
1014        codec_type: &syn::Type,
1015        shared_handle: String,
1016    ) -> syn::Expr {
1017        p2.exposed_ports.borrow_mut().push(*p2_port);
1018
1019        let socket_ident = syn::Ident::new(
1020            &format!("__hydro_deploy_many_{}_socket", &shared_handle),
1021            Span::call_site(),
1022        );
1023
1024        let source_ident = syn::Ident::new(
1025            &format!("__hydro_deploy_many_{}_source", &shared_handle),
1026            Span::call_site(),
1027        );
1028
1029        let sink_ident = syn::Ident::new(
1030            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1031            Span::call_site(),
1032        );
1033
1034        let membership_ident = syn::Ident::new(
1035            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
1036            Span::call_site(),
1037        );
1038
1039        let bind_addr = format!("0.0.0.0:{}", p2_port);
1040
1041        extra_stmts.push(syn::parse_quote! {
1042            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1043        });
1044
1045        let root = crate::staging_util::get_this_crate();
1046
1047        extra_stmts.push(syn::parse_quote! {
1048            let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
1049        });
1050
1051        parse_quote!(#source_ident)
1052    }
1053
1054    #[instrument(level = "trace", skip_all, fields(%shared_handle))]
1055    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
1056        let sink_ident = syn::Ident::new(
1057            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1058            Span::call_site(),
1059        );
1060        parse_quote!(#sink_ident)
1061    }
1062
1063    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1064    fn e2o_source(
1065        extra_stmts: &mut Vec<syn::Stmt>,
1066        p1: &Self::External,
1067        p1_port: &<Self::External as Node>::Port,
1068        p2: &Self::Process,
1069        p2_port: &<Self::Process as Node>::Port,
1070        _codec_type: &syn::Type,
1071        shared_handle: String,
1072    ) -> syn::Expr {
1073        p1.connection_info.borrow_mut().insert(
1074            *p1_port,
1075            (
1076                p2.docker_container_name.clone(),
1077                *p2_port,
1078                p2.network.clone(),
1079            ),
1080        );
1081
1082        p2.exposed_ports.borrow_mut().push(*p2_port);
1083
1084        let socket_ident = syn::Ident::new(
1085            &format!("__hydro_deploy_{}_socket", &shared_handle),
1086            Span::call_site(),
1087        );
1088
1089        let source_ident = syn::Ident::new(
1090            &format!("__hydro_deploy_{}_source", &shared_handle),
1091            Span::call_site(),
1092        );
1093
1094        let sink_ident = syn::Ident::new(
1095            &format!("__hydro_deploy_{}_sink", &shared_handle),
1096            Span::call_site(),
1097        );
1098
1099        let bind_addr = format!("0.0.0.0:{}", p2_port);
1100
1101        extra_stmts.push(syn::parse_quote! {
1102            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1103        });
1104
1105        let create_expr = deploy_containerized_external_sink_source_ident(socket_ident);
1106
1107        extra_stmts.push(syn::parse_quote! {
1108            let (#sink_ident, #source_ident) = (#create_expr).split();
1109        });
1110
1111        parse_quote!(#source_ident)
1112    }
1113
1114    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1115    fn e2o_connect(
1116        p1: &Self::External,
1117        p1_port: &<Self::External as Node>::Port,
1118        p2: &Self::Process,
1119        p2_port: &<Self::Process as Node>::Port,
1120        many: bool,
1121        server_hint: NetworkHint,
1122    ) -> Box<dyn FnOnce()> {
1123        if server_hint != NetworkHint::Auto {
1124            panic!(
1125                "Docker deployment only supports NetworkHint::Auto, got {:?}",
1126                server_hint
1127            );
1128        }
1129
1130        // For many connections, we need to populate connection_info so as_bincode_bidi can find it
1131        if many {
1132            p1.connection_info.borrow_mut().insert(
1133                *p1_port,
1134                (
1135                    p2.docker_container_name.clone(),
1136                    *p2_port,
1137                    p2.network.clone(),
1138                ),
1139            );
1140        }
1141
1142        let serialized = format!("e2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1143
1144        Box::new(move || {
1145            trace!(name: "e2o_connect thunk", %serialized);
1146        })
1147    }
1148
1149    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1150    fn o2e_sink(
1151        p1: &Self::Process,
1152        p1_port: &<Self::Process as Node>::Port,
1153        p2: &Self::External,
1154        p2_port: &<Self::External as Node>::Port,
1155        shared_handle: String,
1156    ) -> syn::Expr {
1157        let sink_ident = syn::Ident::new(
1158            &format!("__hydro_deploy_{}_sink", &shared_handle),
1159            Span::call_site(),
1160        );
1161        parse_quote!(#sink_ident)
1162    }
1163
1164    #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1165    fn cluster_ids(
1166        of_cluster: LocationKey,
1167    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1168        cluster_ids()
1169    }
1170
1171    #[instrument(level = "trace", skip_all)]
1172    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1173        cluster_self_id()
1174    }
1175
1176    #[instrument(level = "trace", skip_all, fields(?location_id))]
1177    fn cluster_membership_stream(
1178        location_id: &LocationId,
1179    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1180    {
1181        cluster_membership_stream(location_id)
1182    }
1183}
1184
1185const CONTAINER_ALPHABET: [char; 36] = [
1186    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1187    'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1188];
1189
1190#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location_key, %deployment_instance))]
1191fn get_docker_image_name(
1192    name_hint: &str,
1193    location_key: LocationKey,
1194    deployment_instance: &str,
1195) -> String {
1196    let name_hint = name_hint
1197        .split("::")
1198        .last()
1199        .unwrap()
1200        .to_ascii_lowercase()
1201        .replace(".", "-")
1202        .replace("_", "-")
1203        .replace("::", "-");
1204
1205    let image_unique_tag = nanoid::nanoid!(6, &CONTAINER_ALPHABET);
1206
1207    format!("hy-{name_hint}-{image_unique_tag}-{deployment_instance}-{location_key}")
1208}
1209
1210#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1211fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1212    if let Some(instance) = instance {
1213        format!("{image_name}-{instance}")
1214    } else {
1215        image_name.to_owned()
1216    }
1217}
1218/// Represents a Process running in a docker container
1219#[derive(Clone)]
1220pub struct DockerDeployProcessSpec {
1221    compilation_options: Option<String>,
1222    config: Vec<String>,
1223    network: DockerNetwork,
1224    deployment_instance: String,
1225}
1226
1227impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1228    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1229    fn build(self, key: LocationKey, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1230        DockerDeployProcess {
1231            key,
1232            name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1233
1234            next_port: Rc::new(RefCell::new(1000)),
1235            rust_crate: Rc::new(RefCell::new(None)),
1236
1237            exposed_ports: Rc::new(RefCell::new(Vec::new())),
1238
1239            docker_container_name: Rc::new(RefCell::new(None)),
1240
1241            compilation_options: self.compilation_options,
1242            config: self.config,
1243
1244            network: self.network.clone(),
1245        }
1246    }
1247}
1248
1249/// Represents a Cluster running across `count` docker containers.
1250#[derive(Clone)]
1251pub struct DockerDeployClusterSpec {
1252    compilation_options: Option<String>,
1253    config: Vec<String>,
1254    count: usize,
1255    deployment_instance: String,
1256}
1257
1258impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1259    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1260    fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1261        DockerDeployCluster {
1262            key,
1263            name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1264
1265            next_port: Rc::new(RefCell::new(1000)),
1266            rust_crate: Rc::new(RefCell::new(None)),
1267
1268            docker_container_name: Rc::new(RefCell::new(Vec::new())),
1269
1270            compilation_options: self.compilation_options,
1271            config: self.config,
1272
1273            count: self.count,
1274        }
1275    }
1276}
1277
1278/// Represents an external process outside of the management of hydro deploy.
1279pub struct DockerDeployExternalSpec {
1280    name: String,
1281}
1282
1283impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1284    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1285    fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1286        DockerDeployExternal {
1287            name: self.name,
1288            next_port: Rc::new(RefCell::new(10000)),
1289            ports: Rc::new(RefCell::new(HashMap::new())),
1290            connection_info: Rc::new(RefCell::new(HashMap::new())),
1291        }
1292    }
1293}