Skip to main content

haste_operation_executor/providers/deno_embedded/
pool.rs

1use crate::extract_code_from_operation_definition;
2use crate::providers::deno_embedded::run_code;
3use crate::structs::PluginCodeType;
4use crate::traits::OperationExecutor;
5use deno_core::{error::AnyError, serde_json};
6use haste_fhir_client::FHIRClient;
7use haste_fhir_client::request::InvocationRequest;
8use haste_fhir_model::r4::generated::resources::{OperationDefinition, Parameters};
9use haste_fhir_model::r4::generated::terminology::IssueType;
10use haste_fhir_operation_error::OperationOutcomeError;
11use std::io;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::mpsc;
15use std::thread::{self, JoinHandle};
16use tokio::runtime::Runtime;
17use tokio::sync::oneshot;
18
19type JobResult = Result<Option<serde_json::Value>, AnyError>;
20
21pub struct DenoPool {
22    next_worker: AtomicUsize,
23    workers: Vec<WorkerHandle>,
24}
25
26impl DenoPool {
27    pub fn new(thread_count: usize) -> Result<Self, AnyError> {
28        if thread_count == 0 {
29            return Err(io::Error::other("DenoPool requires at least one worker thread").into());
30        }
31
32        let mut workers = Vec::with_capacity(thread_count);
33
34        for index in 0..thread_count {
35            let result = spawn_worker(index);
36
37            match result {
38                Ok(worker) => workers.push(worker),
39                Err(error) => {
40                    shutdown_workers(&mut workers);
41                    return Err(error);
42                }
43            }
44        }
45
46        Ok(Self {
47            next_worker: AtomicUsize::new(0),
48            workers,
49        })
50    }
51
52    async fn execute<
53        CTX: Clone + Send + 'static,
54        Client: FHIRClient<CTX, OperationOutcomeError> + 'static,
55    >(
56        &self,
57        ctx: CTX,
58        client: Arc<Client>,
59        media_type: PluginCodeType,
60        code: impl Into<String>,
61    ) -> JobResult {
62        let worker_index = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.workers.len();
63        let worker = &self.workers[worker_index];
64        let (response_tx, response_rx) = oneshot::channel();
65        let code = code.into();
66
67        let task = Box::new(move |runtime: &Runtime| {
68            let result = runtime.block_on(async move {
69                let output = run_code(ctx, client, media_type, code).await?;
70
71                output
72                    .map(serde_json::from_value)
73                    .transpose()
74                    .map_err(AnyError::from)
75            });
76
77            let _ = response_tx.send(result);
78        }) as Box<dyn WorkerTask>;
79
80        worker
81            .command_tx
82            .send(WorkerCommand::Run(task))
83            .map_err(|_| io::Error::other("DenoPool worker is not accepting jobs"))?;
84
85        response_rx
86            .await
87            .map_err(|_| io::Error::other("DenoPool worker dropped the response channel"))?
88    }
89}
90
91impl Drop for DenoPool {
92    fn drop(&mut self) {
93        shutdown_workers(&mut self.workers);
94    }
95}
96
97impl OperationExecutor for DenoPool {
98    async fn execute_operation<
99        CTX: Clone + Send + 'static,
100        Client: FHIRClient<CTX, OperationOutcomeError> + 'static,
101    >(
102        &self,
103        context: CTX,
104        client: Arc<Client>,
105        operation: &OperationDefinition,
106        _input: &InvocationRequest,
107    ) -> Result<Parameters, OperationOutcomeError> {
108        let (code, media_type) =
109            extract_code_from_operation_definition(operation).ok_or_else(|| {
110                OperationOutcomeError::error(
111                    IssueType::Invalid(None),
112                    "OperationDefinition missing custom code extension metadata".to_string(),
113                )
114            })?;
115
116        let media_type = PluginCodeType::try_from(media_type)?;
117
118        let output = self
119            .execute(context, client, media_type, code.to_string())
120            .await
121            .map_err(|error| {
122                OperationOutcomeError::error(
123                    IssueType::Processing(None),
124                    format!("Failed to execute operation custom code: {error}"),
125                )
126            })?
127            .ok_or_else(|| {
128                OperationOutcomeError::error(
129                    IssueType::Processing(None),
130                    "Operation custom code returned no output".to_string(),
131                )
132            })?;
133
134        haste_fhir_serialization_json::from_serde_value(output).map_err(|error| {
135            OperationOutcomeError::error(
136                IssueType::Invalid(None),
137                format!("Operation custom code returned invalid Parameters payload: {error}"),
138            )
139        })
140    }
141}
142
143struct WorkerHandle {
144    command_tx: mpsc::Sender<WorkerCommand>,
145    join_handle: Option<JoinHandle<()>>,
146}
147
148enum WorkerCommand {
149    Run(Box<dyn WorkerTask>),
150    Shutdown,
151}
152
153trait WorkerTask: Send + 'static {
154    fn run(self: Box<Self>, runtime: &Runtime);
155}
156
157impl<Function> WorkerTask for Function
158where
159    Function: FnOnce(&Runtime) + Send + 'static,
160{
161    fn run(self: Box<Self>, runtime: &Runtime) {
162        (*self)(runtime);
163    }
164}
165
166fn spawn_worker(index: usize) -> Result<WorkerHandle, AnyError> {
167    let (command_tx, command_rx) = mpsc::channel();
168    let (startup_tx, startup_rx) = mpsc::sync_channel(1);
169
170    let join_handle = thread::Builder::new()
171        .name(format!("deno-pool-{index}"))
172        .spawn(move || {
173            let runtime = match tokio::runtime::Builder::new_current_thread()
174                .enable_all()
175                .build()
176            {
177                Ok(runtime) => {
178                    let _ = startup_tx.send(Ok(()));
179                    runtime
180                }
181                Err(error) => {
182                    let _ = startup_tx.send(Err::<(), AnyError>(error.into()));
183                    return;
184                }
185            };
186
187            while let Ok(command) = command_rx.recv() {
188                match command {
189                    WorkerCommand::Run(task) => task.run(&runtime),
190                    WorkerCommand::Shutdown => break,
191                }
192            }
193        })?;
194
195    startup_rx
196        .recv()
197        .map_err(|_| io::Error::other("DenoPool worker failed during startup"))??;
198
199    Ok(WorkerHandle {
200        command_tx,
201        join_handle: Some(join_handle),
202    })
203}
204
205fn shutdown_workers(workers: &mut [WorkerHandle]) {
206    for worker in workers.iter() {
207        let _ = worker.command_tx.send(WorkerCommand::Shutdown);
208    }
209
210    for worker in workers.iter_mut() {
211        if let Some(join_handle) = worker.join_handle.take() {
212            let _ = join_handle.join();
213        }
214    }
215}