Skip to main content

haste_worker/
lib.rs

1use crate::indexing_lock::IndexLockProvider;
2use haste_config::get_config;
3use haste_fhir_model::r4::generated::resources::ResourceTypeError;
4use haste_fhir_operation_error::{OperationOutcomeError, derive::OperationOutcomeError};
5use haste_fhir_search::{
6    IndexResource, SearchEngine,
7    elastic_search::{
8        ElasticSearchEngine, create_es_client,
9        search_parameter_resolver::ElasticSearchParameterResolver,
10    },
11};
12use haste_fhirpath::FHIRPathError;
13use haste_jwt::{TenantId, VersionId};
14use haste_repository::{fhir::FHIRRepository, types::SupportedFHIRVersions};
15use sqlx::{Pool, Postgres, query_as, types::time::OffsetDateTime};
16use std::sync::Arc;
17use tokio::sync::Mutex;
18
19mod indexing_lock;
20
21#[derive(OperationOutcomeError, Debug)]
22pub enum IndexingWorkerError {
23    #[fatal(code = "exception", diagnostic = "Database error: '{arg0}'")]
24    DatabaseConnectionError(#[from] sqlx::Error),
25    #[fatal(code = "exception", diagnostic = "Lock error: '{arg0}'")]
26    OperationError(#[from] OperationOutcomeError),
27    #[fatal(code = "exception", diagnostic = "Elasticsearch error: '{arg0}'")]
28    ElasticsearchError(#[from] elasticsearch::Error),
29    #[fatal(code = "exception", diagnostic = "FHIRPath error: '{arg0}'")]
30    FHIRPathError(#[from] FHIRPathError),
31    #[fatal(
32        code = "exception",
33        diagnostic = "Missing search parameters for resource: '{arg0}'"
34    )]
35    MissingSearchParameters(String),
36    #[fatal(
37        code = "exception",
38        diagnostic = "Fatal error occurred during indexing"
39    )]
40    Fatal,
41    #[fatal(
42        code = "exception",
43        diagnostic = "Artifact error: Invalid resource type '{arg0}'"
44    )]
45    ResourceTypeError(#[from] ResourceTypeError),
46}
47
48struct TenantReturn {
49    id: TenantId,
50    created_at: OffsetDateTime,
51}
52
53async fn get_tenants(
54    client: &Pool<Postgres>,
55    cursor: &OffsetDateTime,
56    count: usize,
57) -> Result<Vec<TenantReturn>, OperationOutcomeError> {
58    let result = query_as!(
59        TenantReturn,
60        r#"SELECT id as "id: TenantId", created_at FROM tenants WHERE created_at > $1 ORDER BY created_at DESC LIMIT $2"#,
61        cursor,
62        count as i64
63    )
64    .fetch_all(client)
65    .await
66    .map_err(IndexingWorkerError::from)?;
67
68    Ok(result)
69}
70
71static TOTAL_INDEXED: std::sync::LazyLock<Mutex<usize>> =
72    std::sync::LazyLock::new(|| Mutex::new(0));
73
74async fn index_tenant_next_sequence<
75    Repo: FHIRRepository + IndexLockProvider,
76    Engine: SearchEngine,
77>(
78    search_client: Arc<Engine>,
79    tx: &Repo,
80    tenant_id: &TenantId,
81) -> Result<(), IndexingWorkerError> {
82    let start = std::time::Instant::now();
83    let tenant_locks = tx.get_available_locks(vec![tenant_id]).await?;
84
85    if tenant_locks.is_empty() {
86        return Ok(());
87    }
88
89    let resources = tx
90        .get_sequence(
91            tenant_id,
92            tenant_locks[0].index_sequence_position as u64,
93            Some(1000),
94        )
95        .await?;
96
97    let resources_total = resources.len();
98    let start_sequence = resources.first().map(|r| r.sequence);
99    let last_value = resources.last().cloned();
100
101    // Perform indexing if there are resources to index.
102    if !resources.is_empty() {
103        let result = search_client
104            .index(
105                SupportedFHIRVersions::R4,
106                resources
107                    .into_iter()
108                    .map(|r| IndexResource {
109                        tenant: r.tenant,
110                        id: r.id,
111                        version_id: VersionId::new(r.version_id),
112                        project: r.project,
113                        fhir_method: r.fhir_method,
114                        resource_type: r.resource_type,
115                        resource: r.resource.0,
116                    })
117                    .collect(),
118            )
119            .await?;
120
121        if result.0 != resources_total {
122            tracing::error!(
123                "Indexed resource count '{}' does not match retrieved resource count '{}'",
124                result.0,
125                resources_total
126            );
127            return Err(IndexingWorkerError::Fatal);
128        }
129
130        if let Some(resource) = last_value {
131            let diff = (resource.sequence + 1) - start_sequence.unwrap_or(0);
132            let total = resources_total;
133
134            if total != diff as usize {
135                tracing::event!(
136                    tracing::Level::INFO,
137                    // safe_seq = resource.max_safe_seq.unwrap_or(0),
138                    first_seq = start_sequence.unwrap_or(0),
139                    last_seq = resource.sequence,
140                    total = resources_total,
141                    diff = (resource.sequence + 1) - start_sequence.unwrap_or(0)
142                );
143            }
144
145            tx.update_lock(tenant_id.as_ref(), resource.sequence as usize)
146                .await?;
147
148            let elapsed = start.elapsed();
149            tracing::info!(
150                "Indexed {} resources for tenant '{}' in {:.2?} (up to sequence {})",
151                result.0,
152                tenant_id.as_ref(),
153                elapsed,
154                resource.sequence
155            );
156        }
157
158        *(TOTAL_INDEXED.lock().await) += result.0;
159    }
160
161    Ok(())
162}
163
164async fn index_for_tenant<Search: SearchEngine, Repository: FHIRRepository + IndexLockProvider>(
165    repo: Arc<Repository>,
166    search_client: Arc<Search>,
167    tenant_id: &TenantId,
168) -> Result<(), IndexingWorkerError> {
169    let search_client = search_client.clone();
170
171    let tx = repo.transaction(false).await.unwrap();
172
173    let res = index_tenant_next_sequence(search_client, &tx, &tenant_id).await;
174
175    match res {
176        Ok(res) => {
177            tx.commit().await?;
178            Ok(res)
179        }
180        Err(e) => {
181            tx.rollback().await?;
182            Err(e)
183        }
184    }
185}
186
187pub enum IndexingWorkerEnvironmentVariables {
188    DatabaseURL,
189    ElasticSearchURL,
190    ElasticSearchUsername,
191    ElasticSearchPassword,
192}
193
194impl From<IndexingWorkerEnvironmentVariables> for String {
195    fn from(value: IndexingWorkerEnvironmentVariables) -> Self {
196        match value {
197            IndexingWorkerEnvironmentVariables::DatabaseURL => "DATABASE_URL".to_string(),
198            IndexingWorkerEnvironmentVariables::ElasticSearchURL => "ELASTICSEARCH_URL".to_string(),
199            IndexingWorkerEnvironmentVariables::ElasticSearchUsername => {
200                "ELASTICSEARCH_USERNAME".to_string()
201            }
202            IndexingWorkerEnvironmentVariables::ElasticSearchPassword => {
203                "ELASTICSEARCH_PASSWORD".to_string()
204            }
205        }
206    }
207}
208
209pub async fn run_worker() -> Result<(), OperationOutcomeError> {
210    let config = get_config::<IndexingWorkerEnvironmentVariables>("environment".into());
211    let fp_engine = Arc::new(haste_fhirpath::FPEngine::new());
212
213    let pg_pool = sqlx::PgPool::connect(
214        &config
215            .get(IndexingWorkerEnvironmentVariables::DatabaseURL)
216            .unwrap(),
217    )
218    .await
219    .expect("Failed to connect to the database");
220    let repo = Arc::new(haste_repository::pg::PGConnection::pool(pg_pool.clone()));
221    let es_client = create_es_client(
222        &config
223            .get(IndexingWorkerEnvironmentVariables::ElasticSearchURL)
224            .expect(&format!(
225                "'{}' variable not set",
226                String::from(IndexingWorkerEnvironmentVariables::ElasticSearchURL)
227            )),
228        config
229            .get(IndexingWorkerEnvironmentVariables::ElasticSearchUsername)
230            .expect(&format!(
231                "'{}' variable not set",
232                String::from(IndexingWorkerEnvironmentVariables::ElasticSearchUsername)
233            )),
234        config
235            .get(IndexingWorkerEnvironmentVariables::ElasticSearchPassword)
236            .expect(&format!(
237                "'{}' variable not set",
238                String::from(IndexingWorkerEnvironmentVariables::ElasticSearchPassword)
239            )),
240    )
241    .expect("Failed to create Elasticsearch client");
242
243    let search_engine = Arc::new(ElasticSearchEngine::new(
244        Arc::new(ElasticSearchParameterResolver::new(
245            es_client.clone(),
246            repo.clone(),
247        )),
248        fp_engine.clone(),
249        es_client,
250    ));
251
252    let mut attempts = 0;
253    while !search_engine.is_connected().await.is_ok() && attempts < 5 {
254        tracing::error!("Elasticsearch is not connected, retrying in 5 seconds...");
255        tokio::time::sleep(std::time::Duration::from_secs(5)).await;
256        attempts += 1;
257    }
258
259    let mut cursor = OffsetDateTime::UNIX_EPOCH;
260    let tenants_limit: usize = 100;
261
262    tracing::info!("Starting indexing worker...");
263
264    let mut k = *TOTAL_INDEXED.lock().await;
265
266    loop {
267        let tenants_to_check = get_tenants(&pg_pool, &cursor, tenants_limit).await;
268        if let Ok(tenants_to_check) = tenants_to_check {
269            if tenants_to_check.is_empty() || tenants_to_check.len() < tenants_limit {
270                cursor = OffsetDateTime::UNIX_EPOCH; // Reset cursor if no tenants found
271            } else {
272                cursor = tenants_to_check[0].created_at;
273            }
274
275            for tenant in tenants_to_check {
276                let result =
277                    index_for_tenant(repo.clone(), search_engine.clone(), &tenant.id).await;
278
279                if let Err(_error) = result {
280                    tracing::error!(
281                        "Failed to index tenant: '{}' cause: '{:?}'",
282                        &tenant.id,
283                        _error
284                    );
285                }
286            }
287        } else if let Err(error) = tenants_to_check {
288            tracing::error!("Failed to retrieve tenants: {:?}", error);
289        }
290
291        if k != *TOTAL_INDEXED.lock().await {
292            k = *TOTAL_INDEXED.lock().await;
293            tracing::info!("TOTAL INDEXED SO FAR: {}", k);
294        }
295    }
296}