diff --git a/api/Cargo.lock b/api/Cargo.lock
index 0a5fd77..3c605c3 100644
--- a/api/Cargo.lock
+++ b/api/Cargo.lock
@@ -608,7 +608,7 @@ dependencies = [
]
[[package]]
-name = "crabfit_backend"
+name = "crabfit-api"
version = "2.0.0"
dependencies = [
"axum",
diff --git a/api/adaptors/README.md b/api/adaptors/README.md
index f1fbcaa..f8e7f1b 100644
--- a/api/adaptors/README.md
+++ b/api/adaptors/README.md
@@ -10,6 +10,7 @@ Note, you will need to have the following crates as dependencies in your adaptor
- `common` Includes a trait for implementing your adaptor, as well as structs your adaptor needs to return.
- `async-trait` Required because the trait from `common` uses async functions, make sure you include `#[async_trait]` above your trait implementation.
+- `chrono` Required to deal with dates in the common structs and trait function signatures.
Once you've created the adaptor, you'll need to make sure it's included as a dependency in the root [`Cargo.toml`](../Cargo.toml), and add a feature flag with the same name. Make sure you also document the new adaptor in the [api readme](../README.md).
diff --git a/api/adaptors/datastore/src/lib.rs b/api/adaptors/datastore/src/lib.rs
index 3d4e33c..dc38e1b 100644
--- a/api/adaptors/datastore/src/lib.rs
+++ b/api/adaptors/datastore/src/lib.rs
@@ -2,15 +2,10 @@ use std::{env, error::Error, fmt::Display};
use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
-use common::{
- adaptor::Adaptor,
- event::{Event, EventDeletion},
- person::Person,
- stats::Stats,
-};
+use common::{Adaptor, Event, Person, Stats};
use google_cloud::{
authorize::ApplicationCredentials,
- datastore::{Client, Filter, FromValue, IntoValue, Key, Query},
+ datastore::{Client, Filter, FromValue, IntoValue, Key, KeyID, Query},
};
use tokio::sync::Mutex;
@@ -95,9 +90,22 @@ impl Adaptor for DatastoreAdaptor {
))
}
- async fn upsert_person(&self, event_id: String, person: Person) -> Result {
+ async fn upsert_person(
+ &self,
+ event_id: String,
+ person: Person,
+ ) -> Result, Self::Error> {
let mut client = self.client.lock().await;
+ // Check the event exists
+ if client
+ .get::(Key::new(EVENT_KIND).id(event_id.clone()))
+ .await?
+ .is_none()
+ {
+ return Ok(None);
+ }
+
// Check if person exists
let existing_person = client
.query(
@@ -122,7 +130,7 @@ impl Adaptor for DatastoreAdaptor {
.put((key, DatastorePerson::from_person(person.clone(), event_id)))
.await?;
- Ok(person)
+ Ok(Some(person))
}
async fn get_event(&self, id: String) -> Result, Self::Error> {
@@ -151,25 +159,45 @@ impl Adaptor for DatastoreAdaptor {
Ok(event)
}
- async fn delete_event(&self, id: String) -> Result {
+ async fn delete_events(&self, cutoff: DateTime) -> Result {
let mut client = self.client.lock().await;
let mut keys_to_delete: Vec = client
- .query(
- Query::new(PERSON_KIND)
- .filter(Filter::Equal("eventId".into(), id.clone().into_value())),
- )
+ .query(Query::new(EVENT_KIND).filter(Filter::LesserThan(
+ "visited".into(),
+ cutoff.timestamp().into_value(),
+ )))
.await?
.iter()
.map(|entity| entity.key().clone())
.collect();
- let person_count = keys_to_delete.len().try_into().unwrap();
- keys_to_delete.insert(0, Key::new(EVENT_KIND).id(id.clone()));
+ let event_count = keys_to_delete.len() as i64;
+
+ let events_to_delete = keys_to_delete.clone();
+ for e in events_to_delete.iter() {
+ if let KeyID::StringID(id) = e.get_id() {
+ let mut event_people_to_delete: Vec = client
+ .query(
+ Query::new(PERSON_KIND)
+ .filter(Filter::Equal("eventId".into(), id.clone().into_value())),
+ )
+ .await?
+ .iter()
+ .map(|entity| entity.key().clone())
+ .collect();
+ keys_to_delete.append(&mut event_people_to_delete);
+ }
+ }
+
+ let person_count = keys_to_delete.len() as i64 - event_count;
client.delete_all(keys_to_delete).await?;
- Ok(EventDeletion { id, person_count })
+ Ok(Stats {
+ event_count,
+ person_count,
+ })
}
}
diff --git a/api/adaptors/memory/src/lib.rs b/api/adaptors/memory/src/lib.rs
index 200f8c4..2881696 100644
--- a/api/adaptors/memory/src/lib.rs
+++ b/api/adaptors/memory/src/lib.rs
@@ -1,13 +1,8 @@
use std::{collections::HashMap, error::Error, fmt::Display};
use async_trait::async_trait;
-use chrono::Utc;
-use common::{
- adaptor::Adaptor,
- event::{Event, EventDeletion},
- person::Person,
- stats::Stats,
-};
+use chrono::{DateTime, Utc};
+use common::{Adaptor, Event, Person, Stats};
use tokio::sync::Mutex;
struct State {
@@ -68,14 +63,23 @@ impl Adaptor for MemoryAdaptor {
))
}
- async fn upsert_person(&self, event_id: String, person: Person) -> Result {
+ async fn upsert_person(
+ &self,
+ event_id: String,
+ person: Person,
+ ) -> Result, Self::Error> {
let mut state = self.state.lock().await;
+ // Check event exists
+ if state.events.get(&event_id).is_none() {
+ return Ok(None);
+ }
+
state
.people
.insert((event_id, person.name.clone()), person.clone());
- Ok(person)
+ Ok(Some(person))
}
async fn get_event(&self, id: String) -> Result , Self::Error> {
@@ -98,21 +102,38 @@ impl Adaptor for MemoryAdaptor {
Ok(event)
}
- async fn delete_event(&self, id: String) -> Result {
+ async fn delete_events(&self, cutoff: DateTime) -> Result {
let mut state = self.state.lock().await;
- let mut person_count: u64 = state.people.len() as u64;
+ // Delete events older than cutoff date
+ let mut deleted_event_ids: Vec = Vec::new();
+ state.events = state
+ .events
+ .clone()
+ .into_iter()
+ .filter(|(id, event)| {
+ if event.visited_at >= cutoff {
+ true
+ } else {
+ deleted_event_ids.push(id.into());
+ false
+ }
+ })
+ .collect();
+
+ let mut person_count = state.people.len() as i64;
state.people = state
.people
.clone()
.into_iter()
- .filter(|((event_id, _), _)| event_id != &id)
+ .filter(|((event_id, _), _)| deleted_event_ids.contains(event_id))
.collect();
- person_count -= state.people.len() as u64;
+ person_count -= state.people.len() as i64;
- state.events.remove(&id);
-
- Ok(EventDeletion { id, person_count })
+ Ok(Stats {
+ event_count: deleted_event_ids.len() as i64,
+ person_count,
+ })
}
}
diff --git a/api/adaptors/sql/src/lib.rs b/api/adaptors/sql/src/lib.rs
index f0a9137..8a706f3 100644
--- a/api/adaptors/sql/src/lib.rs
+++ b/api/adaptors/sql/src/lib.rs
@@ -1,13 +1,8 @@
use std::{env, error::Error};
use async_trait::async_trait;
-use chrono::{DateTime as ChronoDateTime, Utc};
-use common::{
- adaptor::Adaptor,
- event::{Event, EventDeletion},
- person::Person,
- stats::Stats,
-};
+use chrono::{DateTime, Utc};
+use common::{Adaptor, Event, Person, Stats};
use entity::{event, person, stats};
use migration::{Migrator, MigratorTrait};
use sea_orm::{
@@ -70,7 +65,11 @@ impl Adaptor for SqlAdaptor {
})
}
- async fn upsert_person(&self, event_id: String, person: Person) -> Result {
+ async fn upsert_person(
+ &self,
+ event_id: String,
+ person: Person,
+ ) -> Result, Self::Error> {
let data = person::ActiveModel {
name: Set(person.name.clone()),
password_hash: Set(person.password_hash),
@@ -79,7 +78,16 @@ impl Adaptor for SqlAdaptor {
event_id: Set(event_id.clone()),
};
- Ok(
+ // Check if the event exists
+ if event::Entity::find_by_id(event_id.clone())
+ .one(&self.db)
+ .await?
+ .is_none()
+ {
+ return Ok(None);
+ }
+
+ Ok(Some(
match person::Entity::find_by_id((person.name, event_id))
.one(&self.db)
.await?
@@ -87,7 +95,7 @@ impl Adaptor for SqlAdaptor {
Some(_) => data.update(&self.db).await?.try_into_model()?.into(),
None => data.insert(&self.db).await?.try_into_model()?.into(),
},
- )
+ ))
}
async fn get_event(&self, id: String) -> Result , Self::Error> {
@@ -118,27 +126,43 @@ impl Adaptor for SqlAdaptor {
.into())
}
- async fn delete_event(&self, id: String) -> Result {
- let event_id = id.clone();
- let person_count = self
+ async fn delete_events(&self, cutoff: DateTime) -> Result {
+ let (event_count, person_count) = self
.db
- .transaction::<_, u64, DbErr>(|t| {
+ .transaction::<_, (i64, i64), DbErr>(|t| {
Box::pin(async move {
+ // Get events older than the cutoff date
+ let old_events = event::Entity::find()
+ .filter(event::Column::VisitedAt.lt(cutoff.naive_utc()))
+ .all(t)
+ .await?;
+
// Delete people
- let people_delete_result = person::Entity::delete_many()
- .filter(person::Column::EventId.eq(&event_id))
+ let mut people_deleted: i64 = 0;
+ // TODO: run concurrently
+ for e in old_events.iter() {
+ let people_delete_result = person::Entity::delete_many()
+ .filter(person::Column::EventId.eq(&e.id))
+ .exec(t)
+ .await?;
+ people_deleted += people_delete_result.rows_affected as i64;
+ }
+
+ // Delete events
+ let event_delete_result = event::Entity::delete_many()
+ .filter(event::Column::VisitedAt.lt(cutoff.naive_utc()))
.exec(t)
.await?;
- // Delete event
- event::Entity::delete_by_id(event_id).exec(t).await?;
-
- Ok(people_delete_result.rows_affected)
+ Ok((event_delete_result.rows_affected as i64, people_deleted))
})
})
.await?;
- Ok(EventDeletion { id, person_count })
+ Ok(Stats {
+ event_count,
+ person_count,
+ })
}
}
@@ -190,8 +214,8 @@ impl From for Event {
Self {
id: value.id,
name: value.name,
- created_at: ChronoDateTime::::from_utc(value.created_at, Utc),
- visited_at: ChronoDateTime::::from_utc(value.visited_at, Utc),
+ created_at: DateTime::::from_utc(value.created_at, Utc),
+ visited_at: DateTime::::from_utc(value.visited_at, Utc),
times: serde_json::from_value(value.times).unwrap_or(vec![]),
timezone: value.timezone,
}
@@ -203,7 +227,7 @@ impl From for Person {
Self {
name: value.name,
password_hash: value.password_hash,
- created_at: ChronoDateTime::::from_utc(value.created_at, Utc),
+ created_at: DateTime::::from_utc(value.created_at, Utc),
availability: serde_json::from_value(value.availability).unwrap_or(vec![]),
}
}
diff --git a/api/common/README.md b/api/common/README.md
index 3ca3179..864ea8b 100644
--- a/api/common/README.md
+++ b/api/common/README.md
@@ -1,3 +1,3 @@
# Common
-This crate contains the [adaptor trait](./src/adaptor.rs), and structs that are used by it. These are separated into their own crate so that the root crate and the adaptors can import from it without causing a circular dependency.
+This crate contains the adaptor trait, and structs that are used by it. These are separated into their own crate so that the root crate and the adaptors can import from it without causing a circular dependency.
diff --git a/api/common/src/adaptor.rs b/api/common/src/adaptor.rs
deleted file mode 100644
index 73da286..0000000
--- a/api/common/src/adaptor.rs
+++ /dev/null
@@ -1,30 +0,0 @@
-use std::error::Error;
-
-use async_trait::async_trait;
-
-use crate::{
- event::{Event, EventDeletion},
- person::Person,
- stats::Stats,
-};
-
-/// Data storage adaptor, all methods on an adaptor can return an error if
-/// something goes wrong, or potentially None if the data requested was not found.
-#[async_trait]
-pub trait Adaptor: Send + Sync {
- type Error: Error;
-
- async fn get_stats(&self) -> Result;
- async fn increment_stat_event_count(&self) -> Result;
- async fn increment_stat_person_count(&self) -> Result;
-
- async fn get_people(&self, event_id: String) -> Result>, Self::Error>;
- async fn upsert_person(&self, event_id: String, person: Person) -> Result;
-
- /// Get an event and update visited date to current time
- async fn get_event(&self, id: String) -> Result, Self::Error>;
- async fn create_event(&self, event: Event) -> Result;
-
- /// Delete an event as well as all related people
- async fn delete_event(&self, id: String) -> Result;
-}
diff --git a/api/common/src/event.rs b/api/common/src/event.rs
deleted file mode 100644
index 1b2fd07..0000000
--- a/api/common/src/event.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-use chrono::{DateTime, Utc};
-
-#[derive(Clone)]
-pub struct Event {
- pub id: String,
- pub name: String,
- pub created_at: DateTime,
- pub visited_at: DateTime,
- pub times: Vec,
- pub timezone: String,
-}
-
-#[derive(Clone)]
-/// Info about a deleted event
-pub struct EventDeletion {
- pub id: String,
- /// The amount of people that were in this event that were also deleted
- pub person_count: u64,
-}
diff --git a/api/common/src/lib.rs b/api/common/src/lib.rs
index ae9caf1..5c485a4 100644
--- a/api/common/src/lib.rs
+++ b/api/common/src/lib.rs
@@ -1,4 +1,54 @@
-pub mod adaptor;
-pub mod event;
-pub mod person;
-pub mod stats;
+use std::error::Error;
+
+use async_trait::async_trait;
+use chrono::{DateTime, Utc};
+
+/// Data storage adaptor, all methods on an adaptor can return an error if
+/// something goes wrong, or potentially None if the data requested was not found.
+#[async_trait]
+pub trait Adaptor: Send + Sync {
+ type Error: Error;
+
+ async fn get_stats(&self) -> Result;
+ async fn increment_stat_event_count(&self) -> Result;
+ async fn increment_stat_person_count(&self) -> Result;
+
+ async fn get_people(&self, event_id: String) -> Result>, Self::Error>;
+ async fn upsert_person(
+ &self,
+ event_id: String,
+ person: Person,
+ ) -> Result , Self::Error>;
+
+ /// Get an event and update visited date to current time
+ async fn get_event(&self, id: String) -> Result , Self::Error>;
+ async fn create_event(&self, event: Event) -> Result;
+
+ /// Delete events older than a cutoff date, as well as any associated people
+ /// Returns the amount of events and people deleted
+ async fn delete_events(&self, cutoff: DateTime) -> Result;
+}
+
+#[derive(Clone)]
+pub struct Stats {
+ pub event_count: i64,
+ pub person_count: i64,
+}
+
+#[derive(Clone)]
+pub struct Event {
+ pub id: String,
+ pub name: String,
+ pub created_at: DateTime,
+ pub visited_at: DateTime,
+ pub times: Vec,
+ pub timezone: String,
+}
+
+#[derive(Clone)]
+pub struct Person {
+ pub name: String,
+ pub password_hash: Option,
+ pub created_at: DateTime,
+ pub availability: Vec,
+}
diff --git a/api/common/src/person.rs b/api/common/src/person.rs
deleted file mode 100644
index fd19b76..0000000
--- a/api/common/src/person.rs
+++ /dev/null
@@ -1,9 +0,0 @@
-use chrono::{DateTime, Utc};
-
-#[derive(Clone)]
-pub struct Person {
- pub name: String,
- pub password_hash: Option,
- pub created_at: DateTime,
- pub availability: Vec,
-}
diff --git a/api/common/src/stats.rs b/api/common/src/stats.rs
deleted file mode 100644
index a88d949..0000000
--- a/api/common/src/stats.rs
+++ /dev/null
@@ -1,5 +0,0 @@
-#[derive(Clone)]
-pub struct Stats {
- pub event_count: i64,
- pub person_count: i64,
-}
diff --git a/api/src/docs.rs b/api/src/docs.rs
index 2ab310d..ae3ad11 100644
--- a/api/src/docs.rs
+++ b/api/src/docs.rs
@@ -17,6 +17,7 @@ use utoipa::{
routes::person::get_people,
routes::person::get_person,
routes::person::update_person,
+ routes::tasks::cleanup,
),
components(schemas(
payloads::StatsResponse,
diff --git a/api/src/errors.rs b/api/src/errors.rs
index 47719e0..fe1a1dc 100644
--- a/api/src/errors.rs
+++ b/api/src/errors.rs
@@ -1,5 +1,5 @@
use axum::{http::StatusCode, response::IntoResponse};
-use common::adaptor::Adaptor;
+use common::Adaptor;
pub enum ApiError {
AdaptorError(A::Error),
diff --git a/api/src/main.rs b/api/src/main.rs
index 2dd0788..44bca8b 100644
--- a/api/src/main.rs
+++ b/api/src/main.rs
@@ -82,6 +82,7 @@ async fn main() {
"/event/:event_id/people/:person_name",
patch(person::update_person),
)
+ .route("/tasks/cleanup", patch(tasks::cleanup))
.with_state(shared_state)
.layer(cors)
.layer(rate_limit)
diff --git a/api/src/payloads.rs b/api/src/payloads.rs
index 089a2b8..6bf0176 100644
--- a/api/src/payloads.rs
+++ b/api/src/payloads.rs
@@ -1,5 +1,5 @@
use axum::Json;
-use common::{event::Event, person::Person, stats::Stats};
+use common::{Event, Person, Stats};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
diff --git a/api/src/routes/event.rs b/api/src/routes/event.rs
index 8f7945c..934b57c 100644
--- a/api/src/routes/event.rs
+++ b/api/src/routes/event.rs
@@ -3,7 +3,7 @@ use axum::{
http::StatusCode,
Json,
};
-use common::{adaptor::Adaptor, event::Event};
+use common::{Adaptor, Event};
use rand::{seq::SliceRandom, thread_rng, Rng};
use regex::Regex;
diff --git a/api/src/routes/mod.rs b/api/src/routes/mod.rs
index 5a4889d..7e1429b 100644
--- a/api/src/routes/mod.rs
+++ b/api/src/routes/mod.rs
@@ -1,3 +1,4 @@
pub mod event;
pub mod person;
pub mod stats;
+pub mod tasks;
diff --git a/api/src/routes/person.rs b/api/src/routes/person.rs
index 28c8051..974a519 100644
--- a/api/src/routes/person.rs
+++ b/api/src/routes/person.rs
@@ -4,7 +4,7 @@ use axum::{
Json, TypedHeader,
};
use base64::{engine::general_purpose, Engine};
-use common::{adaptor::Adaptor, person::Person};
+use common::{Adaptor, Person};
use crate::{
errors::ApiError,
@@ -120,6 +120,7 @@ pub async fn get_person(
)
.await
.map_err(ApiError::AdaptorError)?
+ .unwrap()
.into(),
))
}
@@ -189,6 +190,7 @@ pub async fn update_person(
)
.await
.map_err(ApiError::AdaptorError)?
+ .unwrap()
.into(),
))
}
diff --git a/api/src/routes/stats.rs b/api/src/routes/stats.rs
index 46398ce..539fffa 100644
--- a/api/src/routes/stats.rs
+++ b/api/src/routes/stats.rs
@@ -1,5 +1,5 @@
use axum::{extract, Json};
-use common::adaptor::Adaptor;
+use common::Adaptor;
use crate::{
errors::ApiError,
diff --git a/api/src/routes/tasks.rs b/api/src/routes/tasks.rs
new file mode 100644
index 0000000..4017f6d
--- /dev/null
+++ b/api/src/routes/tasks.rs
@@ -0,0 +1,40 @@
+use std::env;
+
+use axum::{extract, http::HeaderMap};
+use common::Adaptor;
+use tracing::info;
+
+use crate::{errors::ApiError, State};
+
+#[utoipa::path(
+ get,
+ path = "/tasks/cleanup",
+ responses(
+ (status = 200, description = "Cleanup complete"),
+ (status = 401, description = "Missing or incorrect X-Cron-Key header"),
+ (status = 429, description = "Too many requests"),
+ ),
+ tag = "tasks",
+)]
+/// Delete events older than 3 months
+pub async fn cleanup(
+ extract::State(state): State,
+ headers: HeaderMap,
+) -> Result<(), ApiError > {
+ // Check cron key
+ let cron_key = headers.get("X-Cron-Key").ok_or(ApiError::NotAuthorized)?;
+ if let Ok(key) = env::var("CRON_KEY") {
+ if !key.is_empty() && *cron_key != key {
+ return Err(ApiError::NotAuthorized);
+ }
+ }
+
+ info!("Running cleanup task");
+
+ let adaptor = &state.lock().await.adaptor;
+
+ // TODO:
+ //let stats = adaptor.get_stats().await.map_err(ApiError::AdaptorError)?;
+
+ Ok(())
+}