haste_server/
services.rs

1use crate::{
2    ServerEnvironmentVariables,
3    fhir_client::{FHIRServerClient, ServerClientConfig},
4};
5use haste_config::Config;
6use haste_fhir_model::r4::generated::terminology::IssueType;
7use haste_fhir_operation_error::{OperationOutcomeError, derive::OperationOutcomeError};
8use haste_fhir_search::{SearchEngine, elastic_search::ElasticSearchEngine};
9use haste_fhir_terminology::{
10    FHIRTerminology,
11    client::FHIRCanonicalTerminology,
12    resolvers::{self, remote::LRUCanonicalRemoteResolver},
13};
14use haste_fhirpath::FPEngine;
15use haste_repository::{Repository, pg::PGConnection};
16use sqlx::{Pool, Postgres};
17use sqlx_postgres::PgPoolOptions;
18use std::{env::VarError, sync::Arc};
19use tokio::sync::OnceCell;
20use tracing::info;
21
22// Singleton for the database connection pool in postgres.
23static POOL: OnceCell<Pool<Postgres>> = OnceCell::const_new();
24pub async fn get_pool(config: &dyn Config<ServerEnvironmentVariables>) -> &'static Pool<Postgres> {
25    POOL.get_or_init(async || {
26        let database_url = config
27            .get(ServerEnvironmentVariables::DataBaseURL)
28            .expect(&format!(
29                "'{}' must be set",
30                String::from(ServerEnvironmentVariables::DataBaseURL)
31            ));
32        info!("Connecting to postgres database");
33        let connection = PgPoolOptions::new()
34            .max_connections(5)
35            .connect(&database_url)
36            .await
37            .expect("Failed to create database connection pool");
38        connection
39    })
40    .await
41}
42
43#[derive(OperationOutcomeError, Debug)]
44pub enum ConfigError {
45    #[error(code = "invalid", diagnostic = "Invalid environment!")]
46    DotEnv(#[from] dotenvy::Error),
47    #[error(code = "invalid", diagnostic = "Invalid session!")]
48    Session(#[from] tower_sessions::session::Error),
49    #[error(code = "invalid", diagnostic = "Database error")]
50    Database(#[from] sqlx::Error),
51    #[error(code = "invalid", diagnostic = "Environment variable not set {arg0}")]
52    EnvironmentVariable(#[from] VarError),
53    #[error(code = "invalid", diagnostic = "Failed to render template.")]
54    TemplateRender,
55}
56
57#[derive(OperationOutcomeError, Debug)]
58pub enum CustomOpError {
59    #[error(code = "invalid", diagnostic = "FHIRPath error")]
60    FHIRPath(#[from] haste_fhirpath::FHIRPathError),
61    #[error(code = "invalid", diagnostic = "Failed to deserialize resource")]
62    Deserialize(#[from] serde_json::Error),
63    #[error(code = "invalid", diagnostic = "Internal server error")]
64    InternalServerError,
65}
66
67pub struct AppState<
68    Repo: Repository + Send + Sync + 'static,
69    Search: SearchEngine + Send + Sync + 'static,
70    Terminology: FHIRTerminology + Send + Sync + 'static,
71> {
72    pub terminology: Arc<Terminology>,
73    pub search: Arc<Search>,
74    pub repo: Arc<Repo>,
75    pub fhir_client: Arc<FHIRServerClient<Repo, Search, Terminology>>,
76    pub config: Arc<dyn Config<ServerEnvironmentVariables>>,
77}
78
79impl<
80    Repo: Repository + Send + Sync + 'static,
81    Search: SearchEngine + Send + Sync + 'static,
82    Terminology: FHIRTerminology + Send + Sync + 'static,
83> AppState<Repo, Search, Terminology>
84{
85    pub async fn transaction(&self) -> Result<Self, OperationOutcomeError> {
86        self.repo.transaction(true).await.map(|tx_repo| {
87            let tx_repo = Arc::new(tx_repo);
88            AppState {
89                terminology: self.terminology.clone(),
90                search: self.search.clone(),
91                repo: tx_repo.clone(),
92                fhir_client: Arc::new(FHIRServerClient::new(ServerClientConfig::new(
93                    tx_repo,
94                    self.search.clone(),
95                    self.terminology.clone(),
96                    self.config.clone(),
97                ))),
98                config: self.config.clone(),
99            }
100        })
101    }
102    pub async fn commit(self) -> Result<(), OperationOutcomeError> {
103        let repo = self.repo.clone();
104        drop(self);
105
106        Arc::try_unwrap(repo)
107            .map_err(|_e| {
108                OperationOutcomeError::fatal(
109                    IssueType::Exception(None),
110                    "Failed to unwrap transaction client".to_string(),
111                )
112            })?
113            .commit()
114            .await?;
115
116        Ok(())
117    }
118}
119
120pub async fn create_services(
121    config: Arc<dyn Config<ServerEnvironmentVariables>>,
122) -> Result<
123    Arc<
124        AppState<
125            PGConnection,
126            ElasticSearchEngine,
127            FHIRCanonicalTerminology<LRUCanonicalRemoteResolver<PGConnection, ElasticSearchEngine>>,
128        >,
129    >,
130    OperationOutcomeError,
131> {
132    let pool = get_pool(config.as_ref()).await;
133    let search_engine = Arc::new(
134        haste_fhir_search::elastic_search::ElasticSearchEngine::new(
135            Arc::new(FPEngine::new()),
136            &config
137                .get(ServerEnvironmentVariables::ElasticSearchURL)
138                .expect(&format!(
139                    "'{}' variable not set",
140                    String::from(ServerEnvironmentVariables::ElasticSearchURL)
141                )),
142            config
143                .get(ServerEnvironmentVariables::ElasticSearchUsername)
144                .expect(&format!(
145                    "'{}' variable not set",
146                    String::from(ServerEnvironmentVariables::ElasticSearchUsername)
147                )),
148            config
149                .get(ServerEnvironmentVariables::ElasticSearchPassword)
150                .expect(&format!(
151                    "'{}' variable not set",
152                    String::from(ServerEnvironmentVariables::ElasticSearchPassword)
153                )),
154        )
155        .expect("Failed to create Elasticsearch client"),
156    );
157
158    let repo = Arc::new(PGConnection::pool(pool.clone()));
159
160    let terminology = Arc::new(FHIRCanonicalTerminology::new(
161        resolvers::remote::LRUCanonicalRemoteResolver::new(repo.clone(), search_engine.clone()),
162    ));
163
164    let can_mutate: String = config
165        .get(ServerEnvironmentVariables::AllowArtifactMutations)
166        .unwrap_or("false".into());
167
168    let fhir_client = Arc::new(FHIRServerClient::new(if can_mutate == "true" {
169        ServerClientConfig::allow_mutate_artifacts(
170            repo.clone(),
171            search_engine.clone(),
172            terminology.clone(),
173            config.clone(),
174        )
175    } else {
176        ServerClientConfig::new(
177            repo.clone(),
178            search_engine.clone(),
179            terminology.clone(),
180            config.clone(),
181        )
182    }));
183
184    let shared_state = Arc::new(AppState {
185        config,
186        repo: repo,
187        terminology: terminology,
188        search: search_engine,
189        fhir_client,
190    });
191
192    Ok(shared_state)
193}