Uploader des gros fichiers en asynchrone avec Celery
Hello,
Je suis en train de m'arracher les cheveux avec Celery et Django.
Dans mon app, je dois gérer de gros uploads de fichiers (jusqu'à 100Mo) qui sont hébergés sur un S3.
J'essaye de passer ce processus d'upload en background avec Celery car mon app étant déployée sur Azure, j'ai un timeout max de 240s, sinon l'appli plante.
Celery prend par défaut du JSON en paramètre, cf la config :
CELERY_BROKER_URL = env("REDIS_BROKER_URL")
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Paris'
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
Voici un exemple de code issu de ma vue :
if request.method != 'POST':
return render(request, 'reports/single-report.html',
context={'slug': slug, 'report': report, 'transporters': transporters,
'transporters_complete': transporters_complete,
'transporter_form': transporter_form, 'missing_transporters': missing_transporters,
'existing_transporters_files_pk': existing_transporters_files_pk,
'list_theoretical_prices': list_theoretical_prices,
'list_calculated_prices': list_calculated_prices,
'credit_notes': credit_notes, 'percentage_credit_notes': percentage_credit_notes,
'list_labels': list_labels, 'delete_transporter_file_form': DeleteTransporterFileForm(),
'list_transporters_pk_slug': list_transporters_pk_slug})
files = request.FILES.getlist('file')
transporter_name = request.POST['transporter']
transporter = Transporter.objects.get(name=transporter_name)
random_id = get_random_string(length=32,
allowed_chars='abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
tmp_folder = f'{random_id}'
file_paths = []
if len(files) > 1:
for file in files:
file_path = default_storage.save(file.name, file)
file_paths.append(file_path)
else:
file_path = default_storage.save(files[0].name, files[0])
file_paths.append(file_path)
La première solution que j'ai testée est de sauvegarder localement les fichiers puis de passer les chemins d'accès à ma tâche Celery :
#views.py
prep_import = prep_import_scaleway.delay(file_path=file_paths, tmp_folder=tmp_folder)
#tasks.py
def prep_import_scaleway(file_path, tmp_folder):
# The objective of this function is to prepare the import of the file to Scaleway. We stock files in a tmp folder
if len(file_path) > 1:
for file in file_path:
key = f'tmp/{tmp_folder}/{file}'
#We convert the file to a BytesIO object from JSON
with default_storage.open(file, 'rb') as f:
file_t = io.BytesIO(f.read())
scaleway = Scaleway(region_name=settings.SCALEWAY_S3_REGION_NAME,
endpoint_url=settings.SCALEWAY_S3_ENDPOINT_URL,
access_key=settings.SCALEWAY_ACCESS_KEY_ID,
secret_key=settings.SCALEWAY_SECRET_ACCESS_KEY)
scaleway.upload_file_scaleway(file=file_t, key=key, bucket=settings.SCALEWAY_STORAGE_BUCKET_NAME)
#We delete the file from the server
default_storage.delete(file)
print('ok')
print(f'Key is {key}')
else:
file_path = file_path[0]
key = f'tmp/{tmp_folder}/{file_path}'
#We convert the file to a BytesIO object from JSON
with default_storage.open(file_path, 'rb') as f:
file_t = io.BytesIO(f.read())
scaleway = Scaleway(region_name=settings.SCALEWAY_S3_REGION_NAME,
endpoint_url=settings.SCALEWAY_S3_ENDPOINT_URL,
access_key=settings.SCALEWAY_ACCESS_KEY_ID,
secret_key=settings.SCALEWAY_SECRET_ACCESS_KEY)
scaleway.upload_file_scaleway(file=file_t, key=key, bucket=settings.SCALEWAY_STORAGE_BUCKET_NAME)
#We delete the file from the server
default_storage.delete(file_path)
print('ok')
print(f'Key is {key}')
Mais je me tape encore le processus d'upload et cela génère un temps d'attente. Mon objectif est de passer la liste de fichiers et de rendre la vue directement dès la soumission du formulaire.
Malheureusement, on ne peut pas passer de fichiers directement à tâche Celery car les fichiers ne peuvent pas être sérialisés en JSON. Dans ma configuration actuelle, j'attends que les tâches finissent d'être traitées avant d'exécuter les autres. Par exemple :
prep_import = prep_import_scaleway.delay(file_path=file_paths, tmp_folder=tmp_folder)
while not prep_import.ready():
time.sleep(1)
print("Waiting for prep_import to be done")
task_id = handle_import_files_to_scaleway(transporter=transporter_name,
company=request.user.company.pk,
user=request.user.pk,
report=report.pk, temp_key=tmp_folder)
key = AsyncResult(task_id).get()
import_chronopost = handle_chronopost_importer.delay(key=key, transporter=transporter_name,
company_name=request.user.company.name,
report=report.pk)
while not import_chronopost.ready():
time.sleep(1)
print("Waiting for import to be done")
send_mail(
template_id=5,
to=request.user.email,
email=request.user.email,
first_name=request.user.first_name,
last_name=request.user.last_name,
url=request.build_absolute_uri(),
transporter=transporter_name,
)
Le seul workaround que j'ai trouvé est de changer :
CELERY_ACCEPT_CONTENT = ['pickle']
Mais a priori, cela pose un problème en matière de sécurité, cf la doc.
Même Azure me mettait un message d'alerte :
Running a worker with superuser privileges when the
2023-02-10T13:32:19.063521617Z worker accepts messages serialized with pickle is a very bad idea!
Est-ce que vous auriez une idée de comment gérer ce processus en arrière-plan pour gérer l'upload ?
Merci d'avance !
Vincent
Inscris-toi
(c'est gratuit !)
Tu dois créer un compte pour participer aux discussions.
Créer un compte