package store import ( "database/sql" "encoding/json" "errors" "strings" "time" _ "github.com/jackc/pgx/v5/stdlib" ) type PostgresStore struct { db *sql.DB } func NewPostgresStore(databaseURL string) (*PostgresStore, error) { db, err := sql.Open("pgx", databaseURL) if err != nil { return nil, err } if err := db.Ping(); err != nil { return nil, err } return &PostgresStore{db: db}, nil } func (s *PostgresStore) Close() error { return s.db.Close() } func (s *PostgresStore) UpsertUser(user User) { _, _ = s.db.Exec( `INSERT INTO users (public_key, inviter, created_at) VALUES ($1, $2, $3) ON CONFLICT (public_key) DO UPDATE SET inviter = EXCLUDED.inviter`, user.PublicKey, nullStr(user.Inviter), user.CreatedAt, ) } func (s *PostgresStore) GetUser(publicKey string) (User, error) { var u User err := s.db.QueryRow( `SELECT public_key, COALESCE(inviter,''), created_at FROM users WHERE public_key = $1`, publicKey, ).Scan(&u.PublicKey, &u.Inviter, &u.CreatedAt) if errors.Is(err, sql.ErrNoRows) { return User{}, ErrNotFound } if err != nil { return User{}, err } return u, nil } func (s *PostgresStore) CreateChallenge(ch Challenge) error { _, err := s.db.Exec( `INSERT INTO challenges (nonce, public_key, ip, expires_at, used) VALUES ($1, $2, $3, $4, $5)`, ch.Nonce, ch.PublicKey, nullStr(ch.IP), ch.ExpiresAt, ch.Used, ) if err != nil { if isUniqueViolation(err) { return ErrAlreadyExists } return err } return nil } func (s *PostgresStore) GetChallenge(nonce string) (Challenge, error) { var ch Challenge err := s.db.QueryRow( `SELECT nonce, public_key, COALESCE(ip,''), expires_at, used FROM challenges WHERE nonce = $1`, nonce, ).Scan(&ch.Nonce, &ch.PublicKey, &ch.IP, &ch.ExpiresAt, &ch.Used) if errors.Is(err, sql.ErrNoRows) { return Challenge{}, ErrNotFound } if err != nil { return Challenge{}, err } return ch, nil } func (s *PostgresStore) MarkChallengeUsed(nonce string) error { res, err := s.db.Exec(`UPDATE challenges SET used = true WHERE nonce = $1`, nonce) if err != nil { return err } n, _ := res.RowsAffected() if n == 0 { return ErrNotFound } return nil } func (s *PostgresStore) SaveSession(session Session) { _, _ = s.db.Exec( `INSERT INTO sessions (token, public_key, expires_at) VALUES ($1, $2, $3) ON CONFLICT (token) DO UPDATE SET public_key = EXCLUDED.public_key, expires_at = EXCLUDED.expires_at`, session.Token, session.PublicKey, session.ExpiresAt, ) } func (s *PostgresStore) GetSession(token string) (Session, error) { var sess Session err := s.db.QueryRow( `SELECT token, public_key, expires_at FROM sessions WHERE token = $1`, token, ).Scan(&sess.Token, &sess.PublicKey, &sess.ExpiresAt) if errors.Is(err, sql.ErrNoRows) { return Session{}, ErrNotFound } if err != nil { return Session{}, err } return sess, nil } func (s *PostgresStore) SaveInvitation(inv Invitation) error { _, err := s.db.Exec( `INSERT INTO invitations (jti, inviter_public_key, invitee_public_key, expires_at, max_uses, used_count) VALUES ($1, $2, $3, $4, $5, $6)`, inv.JTI, inv.InviterPublicKey, nullStr(inv.InviteePublicKey), inv.ExpiresAt, inv.MaxUses, inv.UsedCount, ) if err != nil { if isUniqueViolation(err) { return ErrAlreadyExists } return err } return nil } func (s *PostgresStore) GetInvitation(jti string) (Invitation, error) { var inv Invitation var invitee sql.NullString err := s.db.QueryRow( `SELECT jti, inviter_public_key, invitee_public_key, expires_at, max_uses, used_count FROM invitations WHERE jti = $1`, jti, ).Scan(&inv.JTI, &inv.InviterPublicKey, &invitee, &inv.ExpiresAt, &inv.MaxUses, &inv.UsedCount) if errors.Is(err, sql.ErrNoRows) { return Invitation{}, ErrNotFound } if err != nil { return Invitation{}, err } inv.InviteePublicKey = invitee.String return inv, nil } func (s *PostgresStore) IncrementInvitationUsage(jti string) error { res, err := s.db.Exec(`UPDATE invitations SET used_count = used_count + 1 WHERE jti = $1`, jti) if err != nil { return err } n, _ := res.RowsAffected() if n == 0 { return ErrNotFound } return nil } func (s *PostgresStore) SaveCollection(c Collection) { _, _ = s.db.Exec( `INSERT INTO collections (id, owner_key, name, created_at) VALUES ($1, $2, $3, $4) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name`, c.ID, c.OwnerKey, c.Name, c.CreatedAt, ) } func (s *PostgresStore) ListCollectionsByOwner(owner string) []Collection { rows, err := s.db.Query( `SELECT id, owner_key, name, created_at FROM collections WHERE owner_key = $1 ORDER BY created_at`, owner, ) if err != nil { return nil } defer rows.Close() var result []Collection for rows.Next() { var c Collection if err := rows.Scan(&c.ID, &c.OwnerKey, &c.Name, &c.CreatedAt); err != nil { return result } result = append(result, c) } return result } func (s *PostgresStore) GetCollection(id string) (Collection, error) { var c Collection err := s.db.QueryRow( `SELECT id, owner_key, name, created_at FROM collections WHERE id = $1`, id, ).Scan(&c.ID, &c.OwnerKey, &c.Name, &c.CreatedAt) if errors.Is(err, sql.ErrNoRows) { return Collection{}, ErrNotFound } if err != nil { return Collection{}, err } return c, nil } func (s *PostgresStore) DeleteCollection(id string) error { res, err := s.db.Exec(`DELETE FROM collections WHERE id = $1`, id) if err != nil { return err } n, _ := res.RowsAffected() if n == 0 { return ErrNotFound } return nil } func (s *PostgresStore) SaveFeature(f Feature) { geom, _ := json.Marshal(f.Geometry) props, _ := json.Marshal(f.Properties) z := 0.0 if len(f.Geometry.Coordinates) >= 3 { z = f.Geometry.Coordinates[2] } _, _ = s.db.Exec( `INSERT INTO features (id, collection_id, owner_key, type, geometry, properties, created_at, updated_at, geom) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, ST_SetSRID(ST_MakePoint($9, $10, $11), 4326)) ON CONFLICT (id) DO UPDATE SET geometry = EXCLUDED.geometry, properties = EXCLUDED.properties, updated_at = EXCLUDED.updated_at, geom = EXCLUDED.geom`, f.ID, f.CollectionID, f.OwnerKey, f.Type, geom, props, f.CreatedAt, f.UpdatedAt, f.Geometry.Coordinates[0], f.Geometry.Coordinates[1], z, ) } func (s *PostgresStore) ListFeaturesByCollection(collectionID string) []Feature { rows, err := s.db.Query( `SELECT id, collection_id, owner_key, type, geometry, properties, created_at, updated_at FROM features WHERE collection_id = $1 ORDER BY created_at`, collectionID, ) if err != nil { return nil } defer rows.Close() var result []Feature for rows.Next() { var f Feature var geom, props []byte if err := rows.Scan(&f.ID, &f.CollectionID, &f.OwnerKey, &f.Type, &geom, &props, &f.CreatedAt, &f.UpdatedAt); err != nil { return result } _ = json.Unmarshal(geom, &f.Geometry) _ = json.Unmarshal(props, &f.Properties) result = append(result, f) } return result } func (s *PostgresStore) GetFeature(featureID string) (Feature, error) { var f Feature var geom, props []byte err := s.db.QueryRow( `SELECT id, collection_id, owner_key, type, geometry, properties, created_at, updated_at FROM features WHERE id = $1`, featureID, ).Scan(&f.ID, &f.CollectionID, &f.OwnerKey, &f.Type, &geom, &props, &f.CreatedAt, &f.UpdatedAt) if errors.Is(err, sql.ErrNoRows) { return Feature{}, ErrNotFound } if err != nil { return Feature{}, err } _ = json.Unmarshal(geom, &f.Geometry) _ = json.Unmarshal(props, &f.Properties) return f, nil } func (s *PostgresStore) DeleteFeature(featureID string) error { res, err := s.db.Exec(`DELETE FROM features WHERE id = $1`, featureID) if err != nil { return err } n, _ := res.RowsAffected() if n == 0 { return ErrNotFound } return nil } func (s *PostgresStore) SaveUserLogin(ul UserLogin) { _, _ = s.db.Exec( `INSERT INTO user_logins (public_key, ip, created_at) VALUES ($1, $2, $3)`, ul.PublicKey, nullStr(ul.IP), ul.CreatedAt, ) } func (s *PostgresStore) SaveAsset(a Asset) { _, _ = s.db.Exec( `INSERT INTO assets (id, owner_key, checksum, ext, kind, mime_type, size_bytes, object_key, is_public, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (id) DO UPDATE SET kind = EXCLUDED.kind, mime_type = EXCLUDED.mime_type, size_bytes = EXCLUDED.size_bytes, is_public = EXCLUDED.is_public, updated_at = EXCLUDED.updated_at`, a.ID, a.OwnerKey, a.Checksum, a.Ext, a.Kind, nullStr(a.MimeType), a.SizeBytes, a.ObjectKey, a.IsPublic, a.CreatedAt, a.UpdatedAt, ) } func (s *PostgresStore) GetAsset(assetID string) (Asset, error) { var a Asset var mimeType sql.NullString err := s.db.QueryRow( `SELECT id, owner_key, checksum, ext, kind, mime_type, size_bytes, object_key, is_public, created_at, updated_at FROM assets WHERE id = $1`, assetID, ).Scan(&a.ID, &a.OwnerKey, &a.Checksum, &a.Ext, &a.Kind, &mimeType, &a.SizeBytes, &a.ObjectKey, &a.IsPublic, &a.CreatedAt, &a.UpdatedAt) if errors.Is(err, sql.ErrNoRows) { return Asset{}, ErrNotFound } if err != nil { return Asset{}, err } a.MimeType = mimeType.String return a, nil } func (s *PostgresStore) GetAssetByOwnerChecksumExt(ownerKey, checksum, ext string) (Asset, error) { var a Asset var mimeType sql.NullString err := s.db.QueryRow( `SELECT id, owner_key, checksum, ext, kind, mime_type, size_bytes, object_key, is_public, created_at, updated_at FROM assets WHERE owner_key = $1 AND checksum = $2 AND ext = $3`, ownerKey, checksum, ext, ).Scan(&a.ID, &a.OwnerKey, &a.Checksum, &a.Ext, &a.Kind, &mimeType, &a.SizeBytes, &a.ObjectKey, &a.IsPublic, &a.CreatedAt, &a.UpdatedAt) if errors.Is(err, sql.ErrNoRows) { return Asset{}, ErrNotFound } if err != nil { return Asset{}, err } a.MimeType = mimeType.String return a, nil } func (s *PostgresStore) SetAssetPublic(assetID string, isPublic bool) error { res, err := s.db.Exec(`UPDATE assets SET is_public = $2, updated_at = NOW() WHERE id = $1`, assetID, isPublic) if err != nil { return err } n, _ := res.RowsAffected() if n == 0 { return ErrNotFound } return nil } func (s *PostgresStore) LinkAssetToFeature(featureID, assetID, name, description string) error { _, err := s.db.Exec( `INSERT INTO feature_asset_links (feature_id, asset_id, name, description) VALUES ($1, $2, $3, $4) ON CONFLICT (feature_id, asset_id) DO UPDATE SET name = EXCLUDED.name, description = EXCLUDED.description`, featureID, assetID, nullStr(name), nullStr(description), ) return err } func (s *PostgresStore) UnlinkAssetFromFeature(featureID, assetID string) error { res, err := s.db.Exec(`DELETE FROM feature_asset_links WHERE feature_id = $1 AND asset_id = $2`, featureID, assetID) if err != nil { return err } n, _ := res.RowsAffected() if n == 0 { return ErrNotFound } return nil } func (s *PostgresStore) ListAssetsByFeature(featureID string) []FeatureAsset { rows, err := s.db.Query( `SELECT a.id, a.owner_key, a.checksum, a.ext, a.kind, COALESCE(a.mime_type, ''), a.size_bytes, a.object_key, a.is_public, a.created_at, a.updated_at, l.feature_id, COALESCE(l.name, ''), COALESCE(l.description, ''), l.created_at FROM feature_asset_links l JOIN assets a ON a.id = l.asset_id WHERE l.feature_id = $1 ORDER BY l.created_at`, featureID, ) if err != nil { return nil } defer rows.Close() result := make([]FeatureAsset, 0) for rows.Next() { var fa FeatureAsset if err := rows.Scan( &fa.ID, &fa.OwnerKey, &fa.Checksum, &fa.Ext, &fa.Kind, &fa.MimeType, &fa.SizeBytes, &fa.ObjectKey, &fa.IsPublic, &fa.CreatedAt, &fa.UpdatedAt, &fa.FeatureID, &fa.Name, &fa.Description, &fa.LinkedAt, ); err != nil { return result } result = append(result, fa) } return result } func (s *PostgresStore) PruneExpired(now time.Time) { _, _ = s.db.Exec(`DELETE FROM challenges WHERE expires_at < $1`, now) _, _ = s.db.Exec(`DELETE FROM sessions WHERE expires_at < $1`, now) _, _ = s.db.Exec(`DELETE FROM invitations WHERE expires_at < $1`, now) } func nullStr(s string) interface{} { if s == "" { return nil } return s } func isUniqueViolation(err error) bool { if err == nil { return false } s := err.Error() return strings.Contains(s, "duplicate key") || strings.Contains(s, "23505") }