haste_health/commands/
hl7v2.rs1use crate::CLIState;
2use clap::Subcommand;
3use haste_fhir_client::FHIRClient;
4use haste_fhir_converter::Input;
5use haste_fhir_model::r4::generated::resources::Resource;
6use haste_fhir_model::r4::generated::terminology::{BundleType, IssueType};
7use haste_fhir_operation_error::OperationOutcomeError;
8use haste_hl7v2::mllp::MllpFormatter;
9use std::collections::HashMap;
10use std::io::Write;
11use std::net::TcpListener;
12use std::sync::Arc;
13use tokio::sync::Mutex;
14
15#[derive(Subcommand)]
16pub enum HL7v2Commands {
17 Receiver {
18 #[arg(short, long)]
19 address: String,
20 #[arg(short, long)]
21 port: u16,
22 #[arg(short, long)]
23 main: String,
24 #[arg(short, long)]
25 template_dir: String,
26 },
27 Sender {
28 #[arg(short, long)]
29 address: String,
30 #[arg(short, long)]
31 port: u16,
32 },
33}
34
35pub async fn hl7v2(
36 state: Arc<Mutex<CLIState>>,
37 command: &HL7v2Commands,
38) -> Result<(), OperationOutcomeError> {
39 let fhir_client = crate::client::fhir_client(state).await?;
40
41 match command {
42 HL7v2Commands::Receiver {
43 address,
44 port,
45 main,
46 template_dir,
47 } => {
48 let listener = TcpListener::bind(format!("{}:{}", address, port)).unwrap();
49
50 let environment = haste_fhir_converter::create_environment(Some(template_dir));
51
52 let template = environment.get_template(&main).map_err(|e| {
53 OperationOutcomeError::error(IssueType::Exception(None), e.to_string())
54 })?;
55
56 for stream in listener.incoming() {
57 let mut stream = match stream {
58 Ok(s) => s,
59 Err(e) => {
60 eprintln!("Failed to accept connection: {}", e);
61 continue;
62 }
63 };
64
65 loop {
66 let frame = match MllpFormatter::read_frame(&mut stream) {
67 Ok(f) => f,
68 Err(e) => {
69 eprintln!("Connection ended: {}", e);
70 break;
71 }
72 };
73
74 let start = std::time::Instant::now();
75
76 let hl7v2_bytes = match MllpFormatter::decode(frame.as_slice()) {
77 Ok(b) => b.to_vec(),
78 Err(e) => {
79 eprintln!("Failed to decode MLLP frame: {}", e);
80 let _ = stream.write_all(&MllpFormatter::nak());
81 continue;
82 }
83 };
84
85 let hl7v2_string = String::from_utf8_lossy(&hl7v2_bytes).to_string();
86
87 let hl7v2 = haste_fhir_converter::convert_input(Input::HL7V2(hl7v2_string))?;
88
89 let mut ctx = HashMap::new();
90 ctx.insert("hl7v2", hl7v2);
91
92 let haste_fhir_converter::Output::FHIR(resource) =
93 haste_fhir_converter::transform(
94 &template,
95 ctx,
96 &haste_fhir_converter::OutputFormat::FHIR,
97 )?
98 else {
99 eprintln!("Unexpected output format from template");
100 let _ = stream.write_all(&MllpFormatter::nak());
101 continue;
102 };
103
104 tracing::info!("total transformation: {:?}", start.elapsed());
105
106 match resource {
107 Resource::Bundle(bundle) => match bundle.type_.as_ref() {
108 BundleType::Batch(_) => match fhir_client.batch((), bundle).await {
109 Ok(_) => {
110 if let Err(e) = stream.write_all(&MllpFormatter::ack()) {
111 eprintln!("Failed to send ACK: {}", e);
112 break;
113 }
114 }
115 Err(e) => {
116 eprintln!("Failed to send batch {}", e);
117 if let Err(e) = stream.write_all(&MllpFormatter::nak()) {
118 eprintln!("Failed to send NAK: {}", e);
119 break;
120 }
121 }
122 },
123 BundleType::Transaction(_) => {
124 match fhir_client.transaction((), bundle).await {
125 Ok(_) => {
126 if let Err(e) = stream.write_all(&MllpFormatter::ack()) {
127 eprintln!("Failed to send ACK: {}", e);
128 break;
129 }
130 }
131 Err(e) => {
132 eprintln!("Failed to send transaction {}", e);
133 if let Err(e) = stream.write_all(&MllpFormatter::nak()) {
134 eprintln!("Failed to send NAK: {}", e);
135 break;
136 }
137 }
138 }
139 }
140 _ => {
141 eprintln!("Unsupported Bundle type: {:?}", bundle.type_);
142 let _ = stream.write_all(&MllpFormatter::nak());
143 continue;
144 }
145 },
146 _ => {
147 let resource_type = resource.resource_type();
148 match fhir_client.create((), resource_type, resource).await {
149 Ok(_) => {
150 if let Err(e) = stream.write_all(&MllpFormatter::ack()) {
151 eprintln!("Failed to send ACK: {}", e);
152 break;
153 }
154 }
155 Err(e) => {
156 eprintln!("Failed to send resource {}", e);
157 if let Err(e) = stream.write_all(&MllpFormatter::nak()) {
158 eprintln!("Failed to send NAK: {}", e);
159 break;
160 }
161 }
162 }
163 }
164 }
165 }
166 }
167
168 Ok(())
169 }
170 HL7v2Commands::Sender {
171 address: _,
172 port: _,
173 } => {
174 todo!("HL7v2 sender not implemented yet");
175 }
176 }
177}