Ada
103479668d
All checks were successful
ci/woodpecker/push/lint Pipeline was successful
Co-authored-by: Ada <ada@gnous.eu> Co-committed-by: Ada <ada@gnous.eu>
47 lines
1.2 KiB
Python
Executable file
47 lines
1.2 KiB
Python
Executable file
"""Manage db connection and insert."""
|
|
import logging
|
|
|
|
from redis import Redis
|
|
|
|
from paste import config
|
|
|
|
|
|
def connect_redis() -> Redis:
|
|
"""
|
|
Connect to redis.
|
|
:return: Redis connection object.
|
|
"""
|
|
return Redis(
|
|
host=config.REDIS_HOST, port=config.REDIS_PORT, db=0, decode_responses=True
|
|
)
|
|
|
|
|
|
def check_content_exist(key: str) -> bool:
|
|
"""
|
|
Verify if key already exist in redis db.
|
|
:param key: Key to check.
|
|
:return: True if existed, else false.
|
|
"""
|
|
db = connect_redis()
|
|
return db.exists(key)
|
|
|
|
|
|
def insert_content_db(url_id: str, expiration: int, content: str, secret: str) -> None:
|
|
"""
|
|
|
|
:param secret: Secret key for delete url
|
|
:param url_id: key for access to content.
|
|
:param expiration: Content expiration in second. Set 0 if no expiration.
|
|
:param content: Paste content
|
|
:return: None
|
|
"""
|
|
if not check_content_exist(url_id):
|
|
db = connect_redis()
|
|
data = {"content": content, "secret": secret}
|
|
for key, key_content in data.items():
|
|
db.hset(url_id, key, key_content)
|
|
if expiration != 0:
|
|
db.expire(url_id, expiration)
|
|
else:
|
|
logging.error(f"Key : {url_id} already exist") # noqa: G004
|