Skip to main content

haste_health/commands/
hl7v2.rs

1use crate::CLIState;
2use clap::Subcommand;
3use haste_fhir_client::FHIRClient;
4use haste_fhir_converter::Input;
5use haste_fhir_model::r4::generated::resources::Resource;
6use haste_fhir_model::r4::generated::terminology::{BundleType, IssueType};
7use haste_fhir_operation_error::OperationOutcomeError;
8use haste_hl7v2::mllp::MllpFormatter;
9use std::collections::HashMap;
10use std::io::Write;
11use std::net::TcpListener;
12use std::sync::Arc;
13use tokio::sync::Mutex;
14
15#[derive(Subcommand)]
16pub enum HL7v2Commands {
17    Receiver {
18        #[arg(short, long)]
19        address: String,
20        #[arg(short, long)]
21        port: u16,
22        #[arg(short, long)]
23        main: String,
24        #[arg(short, long)]
25        template_dir: String,
26    },
27    Sender {
28        #[arg(short, long)]
29        address: String,
30        #[arg(short, long)]
31        port: u16,
32    },
33}
34
35pub async fn hl7v2(
36    state: Arc<Mutex<CLIState>>,
37    command: &HL7v2Commands,
38) -> Result<(), OperationOutcomeError> {
39    let fhir_client = crate::client::fhir_client(state).await?;
40
41    match command {
42        HL7v2Commands::Receiver {
43            address,
44            port,
45            main,
46            template_dir,
47        } => {
48            let listener = TcpListener::bind(format!("{}:{}", address, port)).unwrap();
49
50            let environment = haste_fhir_converter::create_environment(Some(template_dir));
51
52            let template = environment.get_template(&main).map_err(|e| {
53                OperationOutcomeError::error(IssueType::Exception(None), e.to_string())
54            })?;
55
56            for stream in listener.incoming() {
57                let mut stream = match stream {
58                    Ok(s) => s,
59                    Err(e) => {
60                        eprintln!("Failed to accept connection: {}", e);
61                        continue;
62                    }
63                };
64
65                loop {
66                    let frame = match MllpFormatter::read_frame(&mut stream) {
67                        Ok(f) => f,
68                        Err(e) => {
69                            eprintln!("Connection ended: {}", e);
70                            break;
71                        }
72                    };
73
74                    let start = std::time::Instant::now();
75
76                    let hl7v2_bytes = match MllpFormatter::decode(frame.as_slice()) {
77                        Ok(b) => b.to_vec(),
78                        Err(e) => {
79                            eprintln!("Failed to decode MLLP frame: {}", e);
80                            let _ = stream.write_all(&MllpFormatter::nak());
81                            continue;
82                        }
83                    };
84
85                    let hl7v2_string = String::from_utf8_lossy(&hl7v2_bytes).to_string();
86
87                    let hl7v2 = haste_fhir_converter::convert_input(Input::HL7V2(hl7v2_string))?;
88
89                    let mut ctx = HashMap::new();
90                    ctx.insert("hl7v2", hl7v2);
91
92                    let haste_fhir_converter::Output::FHIR(resource) =
93                        haste_fhir_converter::transform(
94                            &template,
95                            ctx,
96                            &haste_fhir_converter::OutputFormat::FHIR,
97                        )?
98                    else {
99                        eprintln!("Unexpected output format from template");
100                        let _ = stream.write_all(&MllpFormatter::nak());
101                        continue;
102                    };
103
104                    tracing::info!("total transformation: {:?}", start.elapsed());
105
106                    match resource {
107                        Resource::Bundle(bundle) => match bundle.type_.as_ref() {
108                            BundleType::Batch(_) => match fhir_client.batch((), bundle).await {
109                                Ok(_) => {
110                                    if let Err(e) = stream.write_all(&MllpFormatter::ack()) {
111                                        eprintln!("Failed to send ACK: {}", e);
112                                        break;
113                                    }
114                                }
115                                Err(e) => {
116                                    eprintln!("Failed to send batch {}", e);
117                                    if let Err(e) = stream.write_all(&MllpFormatter::nak()) {
118                                        eprintln!("Failed to send NAK: {}", e);
119                                        break;
120                                    }
121                                }
122                            },
123                            BundleType::Transaction(_) => {
124                                match fhir_client.transaction((), bundle).await {
125                                    Ok(_) => {
126                                        if let Err(e) = stream.write_all(&MllpFormatter::ack()) {
127                                            eprintln!("Failed to send ACK: {}", e);
128                                            break;
129                                        }
130                                    }
131                                    Err(e) => {
132                                        eprintln!("Failed to send transaction {}", e);
133                                        if let Err(e) = stream.write_all(&MllpFormatter::nak()) {
134                                            eprintln!("Failed to send NAK: {}", e);
135                                            break;
136                                        }
137                                    }
138                                }
139                            }
140                            _ => {
141                                eprintln!("Unsupported Bundle type: {:?}", bundle.type_);
142                                let _ = stream.write_all(&MllpFormatter::nak());
143                                continue;
144                            }
145                        },
146                        _ => {
147                            let resource_type = resource.resource_type();
148                            match fhir_client.create((), resource_type, resource).await {
149                                Ok(_) => {
150                                    if let Err(e) = stream.write_all(&MllpFormatter::ack()) {
151                                        eprintln!("Failed to send ACK: {}", e);
152                                        break;
153                                    }
154                                }
155                                Err(e) => {
156                                    eprintln!("Failed to send resource {}", e);
157                                    if let Err(e) = stream.write_all(&MllpFormatter::nak()) {
158                                        eprintln!("Failed to send NAK: {}", e);
159                                        break;
160                                    }
161                                }
162                            }
163                        }
164                    }
165                }
166            }
167
168            Ok(())
169        }
170        HL7v2Commands::Sender {
171            address: _,
172            port: _,
173        } => {
174            todo!("HL7v2 sender not implemented yet");
175        }
176    }
177}