From 06cce79806fccefa29f7145a7421ca8d122f6dbe Mon Sep 17 00:00:00 2001 From: FreddleSpl0it Date: Mon, 12 Jun 2023 16:37:48 +0200 Subject: [PATCH 1/3] [Dockerapi] add pubsub handler for broadcasting in ha setup --- data/Dockerfiles/dockerapi/Dockerfile | 7 +- .../dockerapi/docker-entrypoint.sh | 2 +- data/Dockerfiles/dockerapi/dockerapi.py | 549 ------------------ data/Dockerfiles/dockerapi/main.py | 260 +++++++++ .../dockerapi/modules/DockerApi.py | 486 ++++++++++++++++ .../Dockerfiles/dockerapi/modules/__init__.py | 0 data/web/inc/functions.docker.inc.php | 11 + data/web/inc/functions.mailbox.inc.php | 9 +- 8 files changed, 764 insertions(+), 560 deletions(-) delete mode 100644 data/Dockerfiles/dockerapi/dockerapi.py create mode 100644 data/Dockerfiles/dockerapi/main.py create mode 100644 data/Dockerfiles/dockerapi/modules/DockerApi.py create mode 100644 data/Dockerfiles/dockerapi/modules/__init__.py diff --git a/data/Dockerfiles/dockerapi/Dockerfile b/data/Dockerfiles/dockerapi/Dockerfile index fce4dde8..3431f939 100644 --- a/data/Dockerfiles/dockerapi/Dockerfile +++ b/data/Dockerfiles/dockerapi/Dockerfile @@ -14,9 +14,12 @@ RUN apk add --update --no-cache python3 \ uvicorn \ aiodocker \ docker \ - redis + aioredis +RUN mkdir /app/modules COPY docker-entrypoint.sh /app/ -COPY dockerapi.py /app/ +COPY main.py /app/main.py +COPY modules/ /app/modules/ ENTRYPOINT ["/bin/sh", "/app/docker-entrypoint.sh"] +CMD exec python main.py \ No newline at end of file diff --git a/data/Dockerfiles/dockerapi/docker-entrypoint.sh b/data/Dockerfiles/dockerapi/docker-entrypoint.sh index aab6cd51..64f4b829 100755 --- a/data/Dockerfiles/dockerapi/docker-entrypoint.sh +++ b/data/Dockerfiles/dockerapi/docker-entrypoint.sh @@ -6,4 +6,4 @@ -subj /CN=dockerapi/O=mailcow \ -addext subjectAltName=DNS:dockerapi` -`uvicorn --host 0.0.0.0 --port 443 --ssl-certfile=/app/dockerapi_cert.pem --ssl-keyfile=/app/dockerapi_key.pem dockerapi:app` +exec "$@" diff --git a/data/Dockerfiles/dockerapi/dockerapi.py b/data/Dockerfiles/dockerapi/dockerapi.py deleted file mode 100644 index 7edb2e08..00000000 --- a/data/Dockerfiles/dockerapi/dockerapi.py +++ /dev/null @@ -1,549 +0,0 @@ -from fastapi import FastAPI, Response, Request -import aiodocker -import docker -import psutil -import sys -import re -import time -import os -import json -import asyncio -import redis -import platform -from datetime import datetime -import logging -from logging.config import dictConfig - - -log_config = { - "version": 1, - "disable_existing_loggers": False, - "formatters": { - "default": { - "()": "uvicorn.logging.DefaultFormatter", - "fmt": "%(levelprefix)s %(asctime)s %(message)s", - "datefmt": "%Y-%m-%d %H:%M:%S", - - }, - }, - "handlers": { - "default": { - "formatter": "default", - "class": "logging.StreamHandler", - "stream": "ext://sys.stderr", - }, - }, - "loggers": { - "api-logger": {"handlers": ["default"], "level": "INFO"}, - }, -} -dictConfig(log_config) - -containerIds_to_update = [] -host_stats_isUpdating = False -app = FastAPI() -logger = logging.getLogger('api-logger') - - -@app.get("/host/stats") -async def get_host_update_stats(): - global host_stats_isUpdating - - if host_stats_isUpdating == False: - asyncio.create_task(get_host_stats()) - host_stats_isUpdating = True - - while True: - if redis_client.exists('host_stats'): - break - await asyncio.sleep(1.5) - - - stats = json.loads(redis_client.get('host_stats')) - return Response(content=json.dumps(stats, indent=4), media_type="application/json") - -@app.get("/containers/{container_id}/json") -async def get_container(container_id : str): - if container_id and container_id.isalnum(): - try: - for container in (await async_docker_client.containers.list()): - if container._id == container_id: - container_info = await container.show() - return Response(content=json.dumps(container_info, indent=4), media_type="application/json") - - res = { - "type": "danger", - "msg": "no container found" - } - return Response(content=json.dumps(res, indent=4), media_type="application/json") - except Exception as e: - res = { - "type": "danger", - "msg": str(e) - } - return Response(content=json.dumps(res, indent=4), media_type="application/json") - else: - res = { - "type": "danger", - "msg": "no or invalid id defined" - } - return Response(content=json.dumps(res, indent=4), media_type="application/json") - -@app.get("/containers/json") -async def get_containers(): - containers = {} - try: - for container in (await async_docker_client.containers.list()): - container_info = await container.show() - containers.update({container_info['Id']: container_info}) - return Response(content=json.dumps(containers, indent=4), media_type="application/json") - except Exception as e: - res = { - "type": "danger", - "msg": str(e) - } - return Response(content=json.dumps(res, indent=4), media_type="application/json") - -@app.post("/containers/{container_id}/{post_action}") -async def post_containers(container_id : str, post_action : str, request: Request): - try : - request_json = await request.json() - except Exception as err: - request_json = {} - - if container_id and container_id.isalnum() and post_action: - try: - """Dispatch container_post api call""" - if post_action == 'exec': - if not request_json or not 'cmd' in request_json: - res = { - "type": "danger", - "msg": "cmd is missing" - } - return Response(content=json.dumps(res, indent=4), media_type="application/json") - if not request_json or not 'task' in request_json: - res = { - "type": "danger", - "msg": "task is missing" - } - return Response(content=json.dumps(res, indent=4), media_type="application/json") - - api_call_method_name = '__'.join(['container_post', str(post_action), str(request_json['cmd']), str(request_json['task']) ]) - else: - api_call_method_name = '__'.join(['container_post', str(post_action) ]) - - docker_utils = DockerUtils(sync_docker_client) - api_call_method = getattr(docker_utils, api_call_method_name, lambda container_id: Response(content=json.dumps({'type': 'danger', 'msg':'container_post - unknown api call' }, indent=4), media_type="application/json")) - - - logger.info("api call: %s, container_id: %s" % (api_call_method_name, container_id)) - return api_call_method(container_id, request_json) - except Exception as e: - logger.error("error - container_post: %s" % str(e)) - res = { - "type": "danger", - "msg": str(e) - } - return Response(content=json.dumps(res, indent=4), media_type="application/json") - - else: - res = { - "type": "danger", - "msg": "invalid container id or missing action" - } - return Response(content=json.dumps(res, indent=4), media_type="application/json") - -@app.post("/container/{container_id}/stats/update") -async def post_container_update_stats(container_id : str): - global containerIds_to_update - - # start update task for container if no task is running - if container_id not in containerIds_to_update: - asyncio.create_task(get_container_stats(container_id)) - containerIds_to_update.append(container_id) - - while True: - if redis_client.exists(container_id + '_stats'): - break - await asyncio.sleep(1.5) - - stats = json.loads(redis_client.get(container_id + '_stats')) - return Response(content=json.dumps(stats, indent=4), media_type="application/json") - - - - -class DockerUtils: - def __init__(self, docker_client): - self.docker_client = docker_client - - # api call: container_post - post_action: stop - def container_post__stop(self, container_id, request_json): - for container in self.docker_client.containers.list(all=True, filters={"id": container_id}): - container.stop() - - res = { 'type': 'success', 'msg': 'command completed successfully'} - return Response(content=json.dumps(res, indent=4), media_type="application/json") - # api call: container_post - post_action: start - def container_post__start(self, container_id, request_json): - for container in self.docker_client.containers.list(all=True, filters={"id": container_id}): - container.start() - - res = { 'type': 'success', 'msg': 'command completed successfully'} - return Response(content=json.dumps(res, indent=4), media_type="application/json") - # api call: container_post - post_action: restart - def container_post__restart(self, container_id, request_json): - for container in self.docker_client.containers.list(all=True, filters={"id": container_id}): - container.restart() - - res = { 'type': 'success', 'msg': 'command completed successfully'} - return Response(content=json.dumps(res, indent=4), media_type="application/json") - # api call: container_post - post_action: top - def container_post__top(self, container_id, request_json): - for container in self.docker_client.containers.list(all=True, filters={"id": container_id}): - res = { 'type': 'success', 'msg': container.top()} - return Response(content=json.dumps(res, indent=4), media_type="application/json") - # api call: container_post - post_action: stats - def container_post__stats(self, container_id, request_json): - for container in self.docker_client.containers.list(all=True, filters={"id": container_id}): - for stat in container.stats(decode=True, stream=True): - res = { 'type': 'success', 'msg': stat} - return Response(content=json.dumps(res, indent=4), media_type="application/json") - - # api call: container_post - post_action: exec - cmd: mailq - task: delete - def container_post__exec__mailq__delete(self, container_id, request_json): - if 'items' in request_json: - r = re.compile("^[0-9a-fA-F]+$") - filtered_qids = filter(r.match, request_json['items']) - if filtered_qids: - flagged_qids = ['-d %s' % i for i in filtered_qids] - sanitized_string = str(' '.join(flagged_qids)); - for container in self.docker_client.containers.list(filters={"id": container_id}): - postsuper_r = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string]) - return exec_run_handler('generic', postsuper_r) - - - # api call: container_post - post_action: exec - cmd: mailq - task: hold - def container_post__exec__mailq__hold(self, container_id, request_json): - if 'items' in request_json: - r = re.compile("^[0-9a-fA-F]+$") - filtered_qids = filter(r.match, request_json['items']) - if filtered_qids: - flagged_qids = ['-h %s' % i for i in filtered_qids] - sanitized_string = str(' '.join(flagged_qids)); - for container in self.docker_client.containers.list(filters={"id": container_id}): - postsuper_r = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string]) - return exec_run_handler('generic', postsuper_r) - - # api call: container_post - post_action: exec - cmd: mailq - task: cat - def container_post__exec__mailq__cat(self, container_id, request_json): - if 'items' in request_json: - r = re.compile("^[0-9a-fA-F]+$") - filtered_qids = filter(r.match, request_json['items']) - if filtered_qids: - sanitized_string = str(' '.join(filtered_qids)); - - for container in self.docker_client.containers.list(filters={"id": container_id}): - postcat_return = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postcat -q " + sanitized_string], user='postfix') - if not postcat_return: - postcat_return = 'err: invalid' - return exec_run_handler('utf8_text_only', postcat_return) - - # api call: container_post - post_action: exec - cmd: mailq - task: unhold - def container_post__exec__mailq__unhold(self, container_id, request_json): - if 'items' in request_json: - r = re.compile("^[0-9a-fA-F]+$") - filtered_qids = filter(r.match, request_json['items']) - if filtered_qids: - flagged_qids = ['-H %s' % i for i in filtered_qids] - sanitized_string = str(' '.join(flagged_qids)); - for container in self.docker_client.containers.list(filters={"id": container_id}): - postsuper_r = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string]) - return exec_run_handler('generic', postsuper_r) - - # api call: container_post - post_action: exec - cmd: mailq - task: deliver - def container_post__exec__mailq__deliver(self, container_id, request_json): - if 'items' in request_json: - r = re.compile("^[0-9a-fA-F]+$") - filtered_qids = filter(r.match, request_json['items']) - if filtered_qids: - flagged_qids = ['-i %s' % i for i in filtered_qids] - for container in self.docker_client.containers.list(filters={"id": container_id}): - for i in flagged_qids: - postqueue_r = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postqueue " + i], user='postfix') - # todo: check each exit code - res = { 'type': 'success', 'msg': 'Scheduled immediate delivery'} - return Response(content=json.dumps(res, indent=4), media_type="application/json") - - # api call: container_post - post_action: exec - cmd: mailq - task: list - def container_post__exec__mailq__list(self, container_id, request_json): - for container in self.docker_client.containers.list(filters={"id": container_id}): - mailq_return = container.exec_run(["/usr/sbin/postqueue", "-j"], user='postfix') - return exec_run_handler('utf8_text_only', mailq_return) - # api call: container_post - post_action: exec - cmd: mailq - task: flush - def container_post__exec__mailq__flush(self, container_id, request_json): - for container in self.docker_client.containers.list(filters={"id": container_id}): - postqueue_r = container.exec_run(["/usr/sbin/postqueue", "-f"], user='postfix') - return exec_run_handler('generic', postqueue_r) - # api call: container_post - post_action: exec - cmd: mailq - task: super_delete - def container_post__exec__mailq__super_delete(self, container_id, request_json): - for container in self.docker_client.containers.list(filters={"id": container_id}): - postsuper_r = container.exec_run(["/usr/sbin/postsuper", "-d", "ALL"]) - return exec_run_handler('generic', postsuper_r) - # api call: container_post - post_action: exec - cmd: system - task: fts_rescan - def container_post__exec__system__fts_rescan(self, container_id, request_json): - if 'username' in request_json: - for container in self.docker_client.containers.list(filters={"id": container_id}): - rescan_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/doveadm fts rescan -u '" + request_json['username'].replace("'", "'\\''") + "'"], user='vmail') - if rescan_return.exit_code == 0: - res = { 'type': 'success', 'msg': 'fts_rescan: rescan triggered'} - return Response(content=json.dumps(res, indent=4), media_type="application/json") - else: - res = { 'type': 'warning', 'msg': 'fts_rescan error'} - return Response(content=json.dumps(res, indent=4), media_type="application/json") - if 'all' in request_json: - for container in self.docker_client.containers.list(filters={"id": container_id}): - rescan_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/doveadm fts rescan -A"], user='vmail') - if rescan_return.exit_code == 0: - res = { 'type': 'success', 'msg': 'fts_rescan: rescan triggered'} - return Response(content=json.dumps(res, indent=4), media_type="application/json") - else: - res = { 'type': 'warning', 'msg': 'fts_rescan error'} - return Response(content=json.dumps(res, indent=4), media_type="application/json") - # api call: container_post - post_action: exec - cmd: system - task: df - def container_post__exec__system__df(self, container_id, request_json): - if 'dir' in request_json: - for container in self.docker_client.containers.list(filters={"id": container_id}): - df_return = container.exec_run(["/bin/bash", "-c", "/bin/df -H '" + request_json['dir'].replace("'", "'\\''") + "' | /usr/bin/tail -n1 | /usr/bin/tr -s [:blank:] | /usr/bin/tr ' ' ','"], user='nobody') - if df_return.exit_code == 0: - return df_return.output.decode('utf-8').rstrip() - else: - return "0,0,0,0,0,0" - # api call: container_post - post_action: exec - cmd: system - task: mysql_upgrade - def container_post__exec__system__mysql_upgrade(self, container_id, request_json): - for container in self.docker_client.containers.list(filters={"id": container_id}): - sql_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/mysql_upgrade -uroot -p'" + os.environ['DBROOT'].replace("'", "'\\''") + "'\n"], user='mysql') - if sql_return.exit_code == 0: - matched = False - for line in sql_return.output.decode('utf-8').split("\n"): - if 'is already upgraded to' in line: - matched = True - if matched: - res = { 'type': 'success', 'msg':'mysql_upgrade: already upgraded', 'text': sql_return.output.decode('utf-8')} - return Response(content=json.dumps(res, indent=4), media_type="application/json") - else: - container.restart() - res = { 'type': 'warning', 'msg':'mysql_upgrade: upgrade was applied', 'text': sql_return.output.decode('utf-8')} - return Response(content=json.dumps(res, indent=4), media_type="application/json") - else: - res = { 'type': 'error', 'msg': 'mysql_upgrade: error running command', 'text': sql_return.output.decode('utf-8')} - return Response(content=json.dumps(res, indent=4), media_type="application/json") - # api call: container_post - post_action: exec - cmd: system - task: mysql_tzinfo_to_sql - def container_post__exec__system__mysql_tzinfo_to_sql(self, container_id, request_json): - for container in self.docker_client.containers.list(filters={"id": container_id}): - sql_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/mysql_tzinfo_to_sql /usr/share/zoneinfo | /bin/sed 's/Local time zone must be set--see zic manual page/FCTY/' | /usr/bin/mysql -uroot -p'" + os.environ['DBROOT'].replace("'", "'\\''") + "' mysql \n"], user='mysql') - if sql_return.exit_code == 0: - res = { 'type': 'info', 'msg': 'mysql_tzinfo_to_sql: command completed successfully', 'text': sql_return.output.decode('utf-8')} - return Response(content=json.dumps(res, indent=4), media_type="application/json") - else: - res = { 'type': 'error', 'msg': 'mysql_tzinfo_to_sql: error running command', 'text': sql_return.output.decode('utf-8')} - return Response(content=json.dumps(res, indent=4), media_type="application/json") - # api call: container_post - post_action: exec - cmd: reload - task: dovecot - def container_post__exec__reload__dovecot(self, container_id, request_json): - for container in self.docker_client.containers.list(filters={"id": container_id}): - reload_return = container.exec_run(["/bin/bash", "-c", "/usr/sbin/dovecot reload"]) - return exec_run_handler('generic', reload_return) - # api call: container_post - post_action: exec - cmd: reload - task: postfix - def container_post__exec__reload__postfix(self, container_id, request_json): - for container in self.docker_client.containers.list(filters={"id": container_id}): - reload_return = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postfix reload"]) - return exec_run_handler('generic', reload_return) - # api call: container_post - post_action: exec - cmd: reload - task: nginx - def container_post__exec__reload__nginx(self, container_id, request_json): - for container in self.docker_client.containers.list(filters={"id": container_id}): - reload_return = container.exec_run(["/bin/sh", "-c", "/usr/sbin/nginx -s reload"]) - return exec_run_handler('generic', reload_return) - # api call: container_post - post_action: exec - cmd: sieve - task: list - def container_post__exec__sieve__list(self, container_id, request_json): - if 'username' in request_json: - for container in self.docker_client.containers.list(filters={"id": container_id}): - sieve_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/doveadm sieve list -u '" + request_json['username'].replace("'", "'\\''") + "'"]) - return exec_run_handler('utf8_text_only', sieve_return) - # api call: container_post - post_action: exec - cmd: sieve - task: print - def container_post__exec__sieve__print(self, container_id, request_json): - if 'username' in request_json and 'script_name' in request_json: - for container in self.docker_client.containers.list(filters={"id": container_id}): - cmd = ["/bin/bash", "-c", "/usr/bin/doveadm sieve get -u '" + request_json['username'].replace("'", "'\\''") + "' '" + request_json['script_name'].replace("'", "'\\''") + "'"] - sieve_return = container.exec_run(cmd) - return exec_run_handler('utf8_text_only', sieve_return) - # api call: container_post - post_action: exec - cmd: maildir - task: cleanup - def container_post__exec__maildir__cleanup(self, container_id, request_json): - if 'maildir' in request_json: - for container in self.docker_client.containers.list(filters={"id": container_id}): - sane_name = re.sub(r'\W+', '', request_json['maildir']) - vmail_name = request_json['maildir'].replace("'", "'\\''") - cmd_vmail = "if [[ -d '/var/vmail/" + vmail_name + "' ]]; then /bin/mv '/var/vmail/" + vmail_name + "' '/var/vmail/_garbage/" + str(int(time.time())) + "_" + sane_name + "'; fi" - index_name = request_json['maildir'].split("/") - if len(index_name) > 1: - index_name = index_name[1].replace("'", "'\\''") + "@" + index_name[0].replace("'", "'\\''") - cmd_vmail_index = "if [[ -d '/var/vmail_index/" + index_name + "' ]]; then /bin/mv '/var/vmail_index/" + index_name + "' '/var/vmail/_garbage/" + str(int(time.time())) + "_" + sane_name + "_index'; fi" - cmd = ["/bin/bash", "-c", cmd_vmail + " && " + cmd_vmail_index] - else: - cmd = ["/bin/bash", "-c", cmd_vmail] - maildir_cleanup = container.exec_run(cmd, user='vmail') - return exec_run_handler('generic', maildir_cleanup) - # api call: container_post - post_action: exec - cmd: rspamd - task: worker_password - def container_post__exec__rspamd__worker_password(self, container_id, request_json): - if 'raw' in request_json: - for container in self.docker_client.containers.list(filters={"id": container_id}): - cmd = "/usr/bin/rspamadm pw -e -p '" + request_json['raw'].replace("'", "'\\''") + "' 2> /dev/null" - cmd_response = exec_cmd_container(container, cmd, user="_rspamd") - - matched = False - for line in cmd_response.split("\n"): - if '$2$' in line: - hash = line.strip() - hash_out = re.search('\$2\$.+$', hash).group(0) - rspamd_passphrase_hash = re.sub('[^0-9a-zA-Z\$]+', '', hash_out.rstrip()) - rspamd_password_filename = "/etc/rspamd/override.d/worker-controller-password.inc" - cmd = '''/bin/echo 'enable_password = "%s";' > %s && cat %s''' % (rspamd_passphrase_hash, rspamd_password_filename, rspamd_password_filename) - cmd_response = exec_cmd_container(container, cmd, user="_rspamd") - if rspamd_passphrase_hash.startswith("$2$") and rspamd_passphrase_hash in cmd_response: - container.restart() - matched = True - if matched: - res = { 'type': 'success', 'msg': 'command completed successfully' } - logger.info('success changing Rspamd password') - return Response(content=json.dumps(res, indent=4), media_type="application/json") - else: - logger.error('failed changing Rspamd password') - res = { 'type': 'danger', 'msg': 'command did not complete' } - return Response(content=json.dumps(res, indent=4), media_type="application/json") - - -def exec_cmd_container(container, cmd, user, timeout=2, shell_cmd="/bin/bash"): - - def recv_socket_data(c_socket, timeout): - c_socket.setblocking(0) - total_data=[] - data='' - begin=time.time() - while True: - if total_data and time.time()-begin > timeout: - break - elif time.time()-begin > timeout*2: - break - try: - data = c_socket.recv(8192) - if data: - total_data.append(data.decode('utf-8')) - #change the beginning time for measurement - begin=time.time() - else: - #sleep for sometime to indicate a gap - time.sleep(0.1) - break - except: - pass - return ''.join(total_data) - - - try : - socket = container.exec_run([shell_cmd], stdin=True, socket=True, user=user).output._sock - if not cmd.endswith("\n"): - cmd = cmd + "\n" - socket.send(cmd.encode('utf-8')) - data = recv_socket_data(socket, timeout) - socket.close() - return data - except Exception as e: - logger.error("error - exec_cmd_container: %s" % str(e)) - traceback.print_exc(file=sys.stdout) -def exec_run_handler(type, output): - if type == 'generic': - if output.exit_code == 0: - res = { 'type': 'success', 'msg': 'command completed successfully' } - return Response(content=json.dumps(res, indent=4), media_type="application/json") - else: - res = { 'type': 'danger', 'msg': 'command failed: ' + output.output.decode('utf-8') } - return Response(content=json.dumps(res, indent=4), media_type="application/json") - if type == 'utf8_text_only': - return Response(content=output.output.decode('utf-8'), media_type="text/plain") - -async def get_host_stats(wait=5): - global host_stats_isUpdating - - try: - system_time = datetime.now() - host_stats = { - "cpu": { - "cores": psutil.cpu_count(), - "usage": psutil.cpu_percent() - }, - "memory": { - "total": psutil.virtual_memory().total, - "usage": psutil.virtual_memory().percent, - "swap": psutil.swap_memory() - }, - "uptime": time.time() - psutil.boot_time(), - "system_time": system_time.strftime("%d.%m.%Y %H:%M:%S"), - "architecture": platform.machine() - } - - redis_client.set('host_stats', json.dumps(host_stats), ex=10) - except Exception as e: - res = { - "type": "danger", - "msg": str(e) - } - - await asyncio.sleep(wait) - host_stats_isUpdating = False - -async def get_container_stats(container_id, wait=5, stop=False): - global containerIds_to_update - - if container_id and container_id.isalnum(): - try: - for container in (await async_docker_client.containers.list()): - if container._id == container_id: - res = await container.stats(stream=False) - - if redis_client.exists(container_id + '_stats'): - stats = json.loads(redis_client.get(container_id + '_stats')) - else: - stats = [] - stats.append(res[0]) - if len(stats) > 3: - del stats[0] - redis_client.set(container_id + '_stats', json.dumps(stats), ex=60) - except Exception as e: - res = { - "type": "danger", - "msg": str(e) - } - else: - res = { - "type": "danger", - "msg": "no or invalid id defined" - } - - await asyncio.sleep(wait) - if stop == True: - # update task was called second time, stop - containerIds_to_update.remove(container_id) - else: - # call update task a second time - await get_container_stats(container_id, wait=0, stop=True) - - - -if os.environ['REDIS_SLAVEOF_IP'] != "": - redis_client = redis.Redis(host=os.environ['REDIS_SLAVEOF_IP'], port=os.environ['REDIS_SLAVEOF_PORT'], db=0) -else: - redis_client = redis.Redis(host='redis-mailcow', port=6379, db=0) - -sync_docker_client = docker.DockerClient(base_url='unix://var/run/docker.sock', version='auto') -async_docker_client = aiodocker.Docker(url='unix:///var/run/docker.sock') - -logger.info('DockerApi started') diff --git a/data/Dockerfiles/dockerapi/main.py b/data/Dockerfiles/dockerapi/main.py new file mode 100644 index 00000000..59d1a8ad --- /dev/null +++ b/data/Dockerfiles/dockerapi/main.py @@ -0,0 +1,260 @@ +import os +import sys +import uvicorn +import json +import uuid +import async_timeout +import asyncio +import aioredis +import aiodocker +import docker +import logging +from logging.config import dictConfig +from fastapi import FastAPI, Response, Request +from modules.DockerApi import DockerApi + +dockerapi = None +app = FastAPI() + +# Define Routes +@app.get("/host/stats") +async def get_host_update_stats(): + global dockerapi + + if dockerapi.host_stats_isUpdating == False: + asyncio.create_task(dockerapi.get_host_stats()) + dockerapi.host_stats_isUpdating = True + + while True: + if await dockerapi.redis_client.exists('host_stats'): + break + await asyncio.sleep(1.5) + + stats = json.loads(await dockerapi.redis_client.get('host_stats')) + return Response(content=json.dumps(stats, indent=4), media_type="application/json") + +@app.get("/containers/{container_id}/json") +async def get_container(container_id : str): + global dockerapi + + if container_id and container_id.isalnum(): + try: + for container in (await dockerapi.async_docker_client.containers.list()): + if container._id == container_id: + container_info = await container.show() + return Response(content=json.dumps(container_info, indent=4), media_type="application/json") + + res = { + "type": "danger", + "msg": "no container found" + } + return Response(content=json.dumps(res, indent=4), media_type="application/json") + except Exception as e: + res = { + "type": "danger", + "msg": str(e) + } + return Response(content=json.dumps(res, indent=4), media_type="application/json") + else: + res = { + "type": "danger", + "msg": "no or invalid id defined" + } + return Response(content=json.dumps(res, indent=4), media_type="application/json") + +@app.get("/containers/json") +async def get_containers(): + global dockerapi + + containers = {} + try: + for container in (await dockerapi.async_docker_client.containers.list()): + container_info = await container.show() + containers.update({container_info['Id']: container_info}) + return Response(content=json.dumps(containers, indent=4), media_type="application/json") + except Exception as e: + res = { + "type": "danger", + "msg": str(e) + } + return Response(content=json.dumps(res, indent=4), media_type="application/json") + +@app.post("/containers/{container_id}/{post_action}") +async def post_containers(container_id : str, post_action : str, request: Request): + global dockerapi + + try : + request_json = await request.json() + except Exception as err: + request_json = {} + + if container_id and container_id.isalnum() and post_action: + try: + """Dispatch container_post api call""" + if post_action == 'exec': + if not request_json or not 'cmd' in request_json: + res = { + "type": "danger", + "msg": "cmd is missing" + } + return Response(content=json.dumps(res, indent=4), media_type="application/json") + if not request_json or not 'task' in request_json: + res = { + "type": "danger", + "msg": "task is missing" + } + return Response(content=json.dumps(res, indent=4), media_type="application/json") + + api_call_method_name = '__'.join(['container_post', str(post_action), str(request_json['cmd']), str(request_json['task']) ]) + else: + api_call_method_name = '__'.join(['container_post', str(post_action) ]) + + api_call_method = getattr(dockerapi, api_call_method_name, lambda container_id: Response(content=json.dumps({'type': 'danger', 'msg':'container_post - unknown api call' }, indent=4), media_type="application/json")) + + dockerapi.logger.info("api call: %s, container_id: %s" % (api_call_method_name, container_id)) + return api_call_method(request_json, container_id=container_id) + except Exception as e: + dockerapi.logger.error("error - container_post: %s" % str(e)) + res = { + "type": "danger", + "msg": str(e) + } + return Response(content=json.dumps(res, indent=4), media_type="application/json") + + else: + res = { + "type": "danger", + "msg": "invalid container id or missing action" + } + return Response(content=json.dumps(res, indent=4), media_type="application/json") + +@app.post("/container/{container_id}/stats/update") +async def post_container_update_stats(container_id : str): + global dockerapi + + # start update task for container if no task is running + if container_id not in dockerapi.containerIds_to_update: + asyncio.create_task(dockerapi.get_container_stats(container_id)) + dockerapi.containerIds_to_update.append(container_id) + + while True: + if await dockerapi.redis_client.exists(container_id + '_stats'): + break + await asyncio.sleep(1.5) + + stats = json.loads(await dockerapi.redis_client.get(container_id + '_stats')) + return Response(content=json.dumps(stats, indent=4), media_type="application/json") + +# Events +@app.on_event("startup") +async def startup_event(): + global dockerapi + + # Initialize a custom logger + logger = logging.getLogger("dockerapi") + logger.setLevel(logging.INFO) + # Configure the logger to output logs to the terminal + handler = logging.StreamHandler() + handler.setLevel(logging.INFO) + formatter = logging.Formatter("%(levelname)s: %(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) + + logger.info("Init APP") + + # Init redis client + if os.environ['REDIS_SLAVEOF_IP'] != "": + redis_client = redis = await aioredis.from_url(f"redis://{os.environ['REDIS_SLAVEOF_IP']}:{os.environ['REDIS_SLAVEOF_PORT']}/0") + else: + redis_client = redis = await aioredis.from_url("redis://redis-mailcow:6379/0") + + # Init docker clients + sync_docker_client = docker.DockerClient(base_url='unix://var/run/docker.sock', version='auto') + async_docker_client = aiodocker.Docker(url='unix:///var/run/docker.sock') + + dockerapi = DockerApi(redis_client, sync_docker_client, async_docker_client, logger) + + logger.info("Subscribe to redis channel") + # Subscribe to redis channel + dockerapi.pubsub = redis.pubsub() + await dockerapi.pubsub.subscribe("MC_CHANNEL") + asyncio.create_task(handle_pubsub_messages(dockerapi.pubsub)) + +@app.on_event("shutdown") +async def shutdown_event(): + global dockerapi + + # Close docker connections + dockerapi.sync_docker_client.close() + await dockerapi.async_docker_client.close() + + # Close redis + await dockerapi.pubsub.unsubscribe("MC_CHANNEL") + await dockerapi.redis_client.close() + +# PubSub Handler +async def handle_pubsub_messages(channel: aioredis.client.PubSub): + global dockerapi + + while True: + try: + async with async_timeout.timeout(1): + message = await channel.get_message(ignore_subscribe_messages=True) + if message is not None: + # Parse message + data_json = json.loads(message['data'].decode('utf-8')) + dockerapi.logger.info(f"PubSub Received - {json.dumps(data_json)}") + + # Handle api_call + if 'api_call' in data_json: + # api_call: container_post + if data_json['api_call'] == "container_post": + if 'post_action' in data_json and 'container_name' in data_json: + try: + """Dispatch container_post api call""" + request_json = {} + if data_json['post_action'] == 'exec': + if 'request' in data_json: + request_json = data_json['request'] + if 'cmd' in request_json: + if 'task' in request_json: + api_call_method_name = '__'.join(['container_post', str(data_json['post_action']), str(request_json['cmd']), str(request_json['task']) ]) + else: + dockerapi.logger.error("api call: task missing") + else: + dockerapi.logger.error("api call: cmd missing") + else: + dockerapi.logger.error("api call: request missing") + else: + api_call_method_name = '__'.join(['container_post', str(data_json['post_action'])]) + + if api_call_method_name: + api_call_method = getattr(dockerapi, api_call_method_name) + if api_call_method: + dockerapi.logger.info("api call: %s, container_name: %s" % (api_call_method_name, data_json['container_name'])) + api_call_method(request_json, container_name=data_json['container_name']) + else: + dockerapi.logger.error("api call not found: %s, container_name: %s" % (api_call_method_name, data_json['container_name'])) + except Exception as e: + dockerapi.logger.error("container_post: %s" % str(e)) + else: + dockerapi.logger.error("api call: missing container_name, post_action or request") + else: + dockerapi.logger.error("Unknwon PubSub recieved - %s" % json.dumps(data_json)) + else: + dockerapi.logger.error("Unknwon PubSub recieved - %s" % json.dumps(data_json)) + + await asyncio.sleep(0.01) + except asyncio.TimeoutError: + pass + +if __name__ == '__main__': + uvicorn.run( + app, + host="0.0.0.0", + port=443, + ssl_certfile="/app/dockerapi_cert.pem", + ssl_keyfile="/app/dockerapi_key.pem", + log_level="info", + loop="none" + ) diff --git a/data/Dockerfiles/dockerapi/modules/DockerApi.py b/data/Dockerfiles/dockerapi/modules/DockerApi.py new file mode 100644 index 00000000..6db77119 --- /dev/null +++ b/data/Dockerfiles/dockerapi/modules/DockerApi.py @@ -0,0 +1,486 @@ +import psutil +import sys +import re +import time +import json +import asyncio +import platform +from datetime import datetime +from fastapi import FastAPI, Response, Request + +class DockerApi: + def __init__(self, redis_client, sync_docker_client, async_docker_client, logger): + self.redis_client = redis_client + self.sync_docker_client = sync_docker_client + self.async_docker_client = async_docker_client + self.logger = logger + + self.host_stats_isUpdating = False + self.containerIds_to_update = [] + + # api call: container_post - post_action: stop + def container_post__stop(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + for container in self.sync_docker_client.containers.list(all=True, filters=filters): + container.stop() + + res = { 'type': 'success', 'msg': 'command completed successfully'} + return Response(content=json.dumps(res, indent=4), media_type="application/json") + # api call: container_post - post_action: start + def container_post__start(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + for container in self.sync_docker_client.containers.list(all=True, filters=filters): + container.start() + + res = { 'type': 'success', 'msg': 'command completed successfully'} + return Response(content=json.dumps(res, indent=4), media_type="application/json") + # api call: container_post - post_action: restart + def container_post__restart(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + for container in self.sync_docker_client.containers.list(all=True, filters=filters): + container.restart() + + res = { 'type': 'success', 'msg': 'command completed successfully'} + return Response(content=json.dumps(res, indent=4), media_type="application/json") + # api call: container_post - post_action: top + def container_post__top(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + for container in self.sync_docker_client.containers.list(all=True, filters=filters): + res = { 'type': 'success', 'msg': container.top()} + return Response(content=json.dumps(res, indent=4), media_type="application/json") + # api call: container_post - post_action: stats + def container_post__stats(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + for container in self.sync_docker_client.containers.list(all=True, filters=filters): + for stat in container.stats(decode=True, stream=True): + res = { 'type': 'success', 'msg': stat} + return Response(content=json.dumps(res, indent=4), media_type="application/json") + # api call: container_post - post_action: exec - cmd: mailq - task: delete + def container_post__exec__mailq__delete(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + if 'items' in request_json: + r = re.compile("^[0-9a-fA-F]+$") + filtered_qids = filter(r.match, request_json['items']) + if filtered_qids: + flagged_qids = ['-d %s' % i for i in filtered_qids] + sanitized_string = str(' '.join(flagged_qids)) + for container in self.sync_docker_client.containers.list(filters=filters): + postsuper_r = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string]) + return self.exec_run_handler('generic', postsuper_r) + # api call: container_post - post_action: exec - cmd: mailq - task: hold + def container_post__exec__mailq__hold(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + if 'items' in request_json: + r = re.compile("^[0-9a-fA-F]+$") + filtered_qids = filter(r.match, request_json['items']) + if filtered_qids: + flagged_qids = ['-h %s' % i for i in filtered_qids] + sanitized_string = str(' '.join(flagged_qids)) + for container in self.sync_docker_client.containers.list(filters=filters): + postsuper_r = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string]) + return self.exec_run_handler('generic', postsuper_r) + # api call: container_post - post_action: exec - cmd: mailq - task: cat + def container_post__exec__mailq__cat(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + if 'items' in request_json: + r = re.compile("^[0-9a-fA-F]+$") + filtered_qids = filter(r.match, request_json['items']) + if filtered_qids: + sanitized_string = str(' '.join(filtered_qids)) + + for container in self.sync_docker_client.containers.list(filters=filters): + postcat_return = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postcat -q " + sanitized_string], user='postfix') + if not postcat_return: + postcat_return = 'err: invalid' + return self.exec_run_handler('utf8_text_only', postcat_return) + # api call: container_post - post_action: exec - cmd: mailq - task: unhold + def container_post__exec__mailq__unhold(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + if 'items' in request_json: + r = re.compile("^[0-9a-fA-F]+$") + filtered_qids = filter(r.match, request_json['items']) + if filtered_qids: + flagged_qids = ['-H %s' % i for i in filtered_qids] + sanitized_string = str(' '.join(flagged_qids)) + for container in self.sync_docker_client.containers.list(filters=filters): + postsuper_r = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string]) + return self.exec_run_handler('generic', postsuper_r) + # api call: container_post - post_action: exec - cmd: mailq - task: deliver + def container_post__exec__mailq__deliver(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + if 'items' in request_json: + r = re.compile("^[0-9a-fA-F]+$") + filtered_qids = filter(r.match, request_json['items']) + if filtered_qids: + flagged_qids = ['-i %s' % i for i in filtered_qids] + for container in self.sync_docker_client.containers.list(filters=filters): + for i in flagged_qids: + postqueue_r = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postqueue " + i], user='postfix') + # todo: check each exit code + res = { 'type': 'success', 'msg': 'Scheduled immediate delivery'} + return Response(content=json.dumps(res, indent=4), media_type="application/json") + # api call: container_post - post_action: exec - cmd: mailq - task: list + def container_post__exec__mailq__list(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + for container in self.sync_docker_client.containers.list(filters=filters): + mailq_return = container.exec_run(["/usr/sbin/postqueue", "-j"], user='postfix') + return self.exec_run_handler('utf8_text_only', mailq_return) + # api call: container_post - post_action: exec - cmd: mailq - task: flush + def container_post__exec__mailq__flush(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + for container in self.sync_docker_client.containers.list(filters=filters): + postqueue_r = container.exec_run(["/usr/sbin/postqueue", "-f"], user='postfix') + return self.exec_run_handler('generic', postqueue_r) + # api call: container_post - post_action: exec - cmd: mailq - task: super_delete + def container_post__exec__mailq__super_delete(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + for container in self.sync_docker_client.containers.list(filters=filters): + postsuper_r = container.exec_run(["/usr/sbin/postsuper", "-d", "ALL"]) + return self.exec_run_handler('generic', postsuper_r) + # api call: container_post - post_action: exec - cmd: system - task: fts_rescan + def container_post__exec__system__fts_rescan(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + if 'username' in request_json: + for container in self.sync_docker_client.containers.list(filters=filters): + rescan_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/doveadm fts rescan -u '" + request_json['username'].replace("'", "'\\''") + "'"], user='vmail') + if rescan_return.exit_code == 0: + res = { 'type': 'success', 'msg': 'fts_rescan: rescan triggered'} + return Response(content=json.dumps(res, indent=4), media_type="application/json") + else: + res = { 'type': 'warning', 'msg': 'fts_rescan error'} + return Response(content=json.dumps(res, indent=4), media_type="application/json") + if 'all' in request_json: + for container in self.sync_docker_client.containers.list(filters=filters): + rescan_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/doveadm fts rescan -A"], user='vmail') + if rescan_return.exit_code == 0: + res = { 'type': 'success', 'msg': 'fts_rescan: rescan triggered'} + return Response(content=json.dumps(res, indent=4), media_type="application/json") + else: + res = { 'type': 'warning', 'msg': 'fts_rescan error'} + return Response(content=json.dumps(res, indent=4), media_type="application/json") + # api call: container_post - post_action: exec - cmd: system - task: df + def container_post__exec__system__df(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + if 'dir' in request_json: + for container in self.sync_docker_client.containers.list(filters=filters): + df_return = container.exec_run(["/bin/bash", "-c", "/bin/df -H '" + request_json['dir'].replace("'", "'\\''") + "' | /usr/bin/tail -n1 | /usr/bin/tr -s [:blank:] | /usr/bin/tr ' ' ','"], user='nobody') + if df_return.exit_code == 0: + return df_return.output.decode('utf-8').rstrip() + else: + return "0,0,0,0,0,0" + # api call: container_post - post_action: exec - cmd: system - task: mysql_upgrade + def container_post__exec__system__mysql_upgrade(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + for container in self.sync_docker_client.containers.list(filters=filters): + sql_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/mysql_upgrade -uroot -p'" + os.environ['DBROOT'].replace("'", "'\\''") + "'\n"], user='mysql') + if sql_return.exit_code == 0: + matched = False + for line in sql_return.output.decode('utf-8').split("\n"): + if 'is already upgraded to' in line: + matched = True + if matched: + res = { 'type': 'success', 'msg':'mysql_upgrade: already upgraded', 'text': sql_return.output.decode('utf-8')} + return Response(content=json.dumps(res, indent=4), media_type="application/json") + else: + container.restart() + res = { 'type': 'warning', 'msg':'mysql_upgrade: upgrade was applied', 'text': sql_return.output.decode('utf-8')} + return Response(content=json.dumps(res, indent=4), media_type="application/json") + else: + res = { 'type': 'error', 'msg': 'mysql_upgrade: error running command', 'text': sql_return.output.decode('utf-8')} + return Response(content=json.dumps(res, indent=4), media_type="application/json") + # api call: container_post - post_action: exec - cmd: system - task: mysql_tzinfo_to_sql + def container_post__exec__system__mysql_tzinfo_to_sql(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + for container in self.sync_docker_client.containers.list(filters=filters): + sql_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/mysql_tzinfo_to_sql /usr/share/zoneinfo | /bin/sed 's/Local time zone must be set--see zic manual page/FCTY/' | /usr/bin/mysql -uroot -p'" + os.environ['DBROOT'].replace("'", "'\\''") + "' mysql \n"], user='mysql') + if sql_return.exit_code == 0: + res = { 'type': 'info', 'msg': 'mysql_tzinfo_to_sql: command completed successfully', 'text': sql_return.output.decode('utf-8')} + return Response(content=json.dumps(res, indent=4), media_type="application/json") + else: + res = { 'type': 'error', 'msg': 'mysql_tzinfo_to_sql: error running command', 'text': sql_return.output.decode('utf-8')} + return Response(content=json.dumps(res, indent=4), media_type="application/json") + # api call: container_post - post_action: exec - cmd: reload - task: dovecot + def container_post__exec__reload__dovecot(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + for container in self.sync_docker_client.containers.list(filters=filters): + reload_return = container.exec_run(["/bin/bash", "-c", "/usr/sbin/dovecot reload"]) + return self.exec_run_handler('generic', reload_return) + # api call: container_post - post_action: exec - cmd: reload - task: postfix + def container_post__exec__reload__postfix(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + for container in self.sync_docker_client.containers.list(filters=filters): + reload_return = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postfix reload"]) + return self.exec_run_handler('generic', reload_return) + # api call: container_post - post_action: exec - cmd: reload - task: nginx + def container_post__exec__reload__nginx(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + for container in self.sync_docker_client.containers.list(filters=filters): + reload_return = container.exec_run(["/bin/sh", "-c", "/usr/sbin/nginx -s reload"]) + return self.exec_run_handler('generic', reload_return) + # api call: container_post - post_action: exec - cmd: sieve - task: list + def container_post__exec__sieve__list(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + if 'username' in request_json: + for container in self.sync_docker_client.containers.list(filters=filters): + sieve_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/doveadm sieve list -u '" + request_json['username'].replace("'", "'\\''") + "'"]) + return self.exec_run_handler('utf8_text_only', sieve_return) + # api call: container_post - post_action: exec - cmd: sieve - task: print + def container_post__exec__sieve__print(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + if 'username' in request_json and 'script_name' in request_json: + for container in self.sync_docker_client.containers.list(filters=filters): + cmd = ["/bin/bash", "-c", "/usr/bin/doveadm sieve get -u '" + request_json['username'].replace("'", "'\\''") + "' '" + request_json['script_name'].replace("'", "'\\''") + "'"] + sieve_return = container.exec_run(cmd) + return self.exec_run_handler('utf8_text_only', sieve_return) + # api call: container_post - post_action: exec - cmd: maildir - task: cleanup + def container_post__exec__maildir__cleanup(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + if 'maildir' in request_json: + for container in self.sync_docker_client.containers.list(filters=filters): + sane_name = re.sub(r'\W+', '', request_json['maildir']) + vmail_name = request_json['maildir'].replace("'", "'\\''") + cmd_vmail = "if [[ -d '/var/vmail/" + vmail_name + "' ]]; then /bin/mv '/var/vmail/" + vmail_name + "' '/var/vmail/_garbage/" + str(int(time.time())) + "_" + sane_name + "'; fi" + index_name = request_json['maildir'].split("/") + if len(index_name) > 1: + index_name = index_name[1].replace("'", "'\\''") + "@" + index_name[0].replace("'", "'\\''") + cmd_vmail_index = "if [[ -d '/var/vmail_index/" + index_name + "' ]]; then /bin/mv '/var/vmail_index/" + index_name + "' '/var/vmail/_garbage/" + str(int(time.time())) + "_" + sane_name + "_index'; fi" + cmd = ["/bin/bash", "-c", cmd_vmail + " && " + cmd_vmail_index] + else: + cmd = ["/bin/bash", "-c", cmd_vmail] + maildir_cleanup = container.exec_run(cmd, user='vmail') + return self.exec_run_handler('generic', maildir_cleanup) + # api call: container_post - post_action: exec - cmd: rspamd - task: worker_password + def container_post__exec__rspamd__worker_password(self, request_json, **kwargs): + if 'container_id' in kwargs: + filters = {"id": kwargs['container_id']} + elif 'container_name' in kwargs: + filters = {"name": kwargs['container_name']} + + if 'raw' in request_json: + for container in self.sync_docker_client.containers.list(filters=filters): + cmd = "/usr/bin/rspamadm pw -e -p '" + request_json['raw'].replace("'", "'\\''") + "' 2> /dev/null" + cmd_response = self.exec_cmd_container(container, cmd, user="_rspamd") + + matched = False + for line in cmd_response.split("\n"): + if '$2$' in line: + hash = line.strip() + hash_out = re.search('\$2\$.+$', hash).group(0) + rspamd_passphrase_hash = re.sub('[^0-9a-zA-Z\$]+', '', hash_out.rstrip()) + rspamd_password_filename = "/etc/rspamd/override.d/worker-controller-password.inc" + cmd = '''/bin/echo 'enable_password = "%s";' > %s && cat %s''' % (rspamd_passphrase_hash, rspamd_password_filename, rspamd_password_filename) + cmd_response = self.exec_cmd_container(container, cmd, user="_rspamd") + if rspamd_passphrase_hash.startswith("$2$") and rspamd_passphrase_hash in cmd_response: + container.restart() + matched = True + if matched: + res = { 'type': 'success', 'msg': 'command completed successfully' } + self.logger.info('success changing Rspamd password') + return Response(content=json.dumps(res, indent=4), media_type="application/json") + else: + self.logger.error('failed changing Rspamd password') + res = { 'type': 'danger', 'msg': 'command did not complete' } + return Response(content=json.dumps(res, indent=4), media_type="application/json") + + # Collect host stats + async def get_host_stats(self, wait=5): + try: + system_time = datetime.now() + host_stats = { + "cpu": { + "cores": psutil.cpu_count(), + "usage": psutil.cpu_percent() + }, + "memory": { + "total": psutil.virtual_memory().total, + "usage": psutil.virtual_memory().percent, + "swap": psutil.swap_memory() + }, + "uptime": time.time() - psutil.boot_time(), + "system_time": system_time.strftime("%d.%m.%Y %H:%M:%S"), + "architecture": platform.machine() + } + + await self.redis_client.set('host_stats', json.dumps(host_stats), ex=10) + except Exception as e: + res = { + "type": "danger", + "msg": str(e) + } + + await asyncio.sleep(wait) + self.host_stats_isUpdating = False + # Collect container stats + async def get_container_stats(self, container_id, wait=5, stop=False): + if container_id and container_id.isalnum(): + try: + for container in (await self.async_docker_client.containers.list()): + if container._id == container_id: + res = await container.stats(stream=False) + + if await self.redis_client.exists(container_id + '_stats'): + stats = json.loads(await self.redis_client.get(container_id + '_stats')) + else: + stats = [] + stats.append(res[0]) + if len(stats) > 3: + del stats[0] + await self.redis_client.set(container_id + '_stats', json.dumps(stats), ex=60) + except Exception as e: + res = { + "type": "danger", + "msg": str(e) + } + else: + res = { + "type": "danger", + "msg": "no or invalid id defined" + } + + await asyncio.sleep(wait) + if stop == True: + # update task was called second time, stop + self.containerIds_to_update.remove(container_id) + else: + # call update task a second time + await self.get_container_stats(container_id, wait=0, stop=True) + + def exec_cmd_container(self, container, cmd, user, timeout=2, shell_cmd="/bin/bash"): + def recv_socket_data(c_socket, timeout): + c_socket.setblocking(0) + total_data=[] + data='' + begin=time.time() + while True: + if total_data and time.time()-begin > timeout: + break + elif time.time()-begin > timeout*2: + break + try: + data = c_socket.recv(8192) + if data: + total_data.append(data.decode('utf-8')) + #change the beginning time for measurement + begin=time.time() + else: + #sleep for sometime to indicate a gap + time.sleep(0.1) + break + except: + pass + return ''.join(total_data) + + try : + socket = container.exec_run([shell_cmd], stdin=True, socket=True, user=user).output._sock + if not cmd.endswith("\n"): + cmd = cmd + "\n" + socket.send(cmd.encode('utf-8')) + data = recv_socket_data(socket, timeout) + socket.close() + return data + except Exception as e: + self.logger.error("error - exec_cmd_container: %s" % str(e)) + traceback.print_exc(file=sys.stdout) + + def exec_run_handler(self, type, output): + if type == 'generic': + if output.exit_code == 0: + res = { 'type': 'success', 'msg': 'command completed successfully' } + return Response(content=json.dumps(res, indent=4), media_type="application/json") + else: + res = { 'type': 'danger', 'msg': 'command failed: ' + output.output.decode('utf-8') } + return Response(content=json.dumps(res, indent=4), media_type="application/json") + if type == 'utf8_text_only': + return Response(content=output.output.decode('utf-8'), media_type="text/plain") diff --git a/data/Dockerfiles/dockerapi/modules/__init__.py b/data/Dockerfiles/dockerapi/modules/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/data/web/inc/functions.docker.inc.php b/data/web/inc/functions.docker.inc.php index 78efac03..5b5b7ace 100644 --- a/data/web/inc/functions.docker.inc.php +++ b/data/web/inc/functions.docker.inc.php @@ -192,5 +192,16 @@ function docker($action, $service_name = null, $attr1 = null, $attr2 = null, $ex } return false; break; + case 'broadcast': + $request = array( + "api_call" => "container_post", + "container_name" => $service_name, + "post_action" => $attr1, + "request" => $attr2 + ); + + $redis->publish("MC_CHANNEL", json_encode($request)); + return true; + break; } } diff --git a/data/web/inc/functions.mailbox.inc.php b/data/web/inc/functions.mailbox.inc.php index 4a2aa6a3..a06e5c22 100644 --- a/data/web/inc/functions.mailbox.inc.php +++ b/data/web/inc/functions.mailbox.inc.php @@ -4930,14 +4930,7 @@ function mailbox($_action, $_type, $_data = null, $_extra = null) { if (!empty($mailbox_details['domain']) && !empty($mailbox_details['local_part'])) { $maildir = $mailbox_details['domain'] . '/' . $mailbox_details['local_part']; $exec_fields = array('cmd' => 'maildir', 'task' => 'cleanup', 'maildir' => $maildir); - $maildir_gc = json_decode(docker('post', 'dovecot-mailcow', 'exec', $exec_fields), true); - if ($maildir_gc['type'] != 'success') { - $_SESSION['return'][] = array( - 'type' => 'warning', - 'log' => array(__FUNCTION__, $_action, $_type, $_data_log, $_attr), - 'msg' => 'Could not move maildir to garbage collector: ' . $maildir_gc['msg'] - ); - } + docker('broadcast', 'dovecot-mailcow', 'exec', $exec_fields); } else { $_SESSION['return'][] = array( From 0f6956572e760f6b303902f95de5898848d28cff Mon Sep 17 00:00:00 2001 From: FreddleSpl0it Date: Fri, 7 Jul 2023 09:58:51 +0200 Subject: [PATCH 2/3] [Web] add CLUSTERMODE environment variable --- data/web/inc/functions.mailbox.inc.php | 15 ++++++++++++++- docker-compose.yml | 1 + 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/data/web/inc/functions.mailbox.inc.php b/data/web/inc/functions.mailbox.inc.php index a06e5c22..8bc6da22 100644 --- a/data/web/inc/functions.mailbox.inc.php +++ b/data/web/inc/functions.mailbox.inc.php @@ -4930,7 +4930,20 @@ function mailbox($_action, $_type, $_data = null, $_extra = null) { if (!empty($mailbox_details['domain']) && !empty($mailbox_details['local_part'])) { $maildir = $mailbox_details['domain'] . '/' . $mailbox_details['local_part']; $exec_fields = array('cmd' => 'maildir', 'task' => 'cleanup', 'maildir' => $maildir); - docker('broadcast', 'dovecot-mailcow', 'exec', $exec_fields); + + if (getenv("CLUSTERMODE") == "replication") { + // broadcast to each dovecot container + docker('broadcast', 'dovecot-mailcow', 'exec', $exec_fields); + } else { + $maildir_gc = json_decode(docker('post', 'dovecot-mailcow', 'exec', $exec_fields), true); + if ($maildir_gc['type'] != 'success') { + $_SESSION['return'][] = array( + 'type' => 'warning', + 'log' => array(__FUNCTION__, $_action, $_type, $_data_log, $_attr), + 'msg' => 'Could not move maildir to garbage collector: ' . $maildir_gc['msg'] + ); + } + } } else { $_SESSION['return'][] = array( diff --git a/docker-compose.yml b/docker-compose.yml index a5a8f95b..4c854aeb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -162,6 +162,7 @@ services: - DEV_MODE=${DEV_MODE:-n} - DEMO_MODE=${DEMO_MODE:-n} - WEBAUTHN_ONLY_TRUSTED_VENDORS=${WEBAUTHN_ONLY_TRUSTED_VENDORS:-n} + - CLUSTERMODE=${CLUSTERMODE:-} restart: always networks: mailcow-network: From 0f0d43b253185e0980d96e8a7cdc98c6cd0ae457 Mon Sep 17 00:00:00 2001 From: FreddleSpl0it Date: Fri, 7 Jul 2023 11:32:28 +0200 Subject: [PATCH 3/3] [Dockerapi] add missing import os --- data/Dockerfiles/dockerapi/modules/DockerApi.py | 1 + 1 file changed, 1 insertion(+) diff --git a/data/Dockerfiles/dockerapi/modules/DockerApi.py b/data/Dockerfiles/dockerapi/modules/DockerApi.py index 6db77119..ea1c104e 100644 --- a/data/Dockerfiles/dockerapi/modules/DockerApi.py +++ b/data/Dockerfiles/dockerapi/modules/DockerApi.py @@ -1,5 +1,6 @@ import psutil import sys +import os import re import time import json