[BS5] update async dockerapi

This commit is contained in:
FreddleSpl0it 2022-08-23 11:57:05 +02:00
parent db8af3d1e0
commit be7252f620
4 changed files with 534 additions and 1010 deletions

View File

@ -15,13 +15,7 @@ RUN apk add --update --no-cache python3 \
aiodocker \ aiodocker \
redis redis
RUN openssl req -x509 -newkey rsa:4096 -sha256 -days 3650 -nodes \ COPY docker-entrypoint.sh /app/
-keyout /app/dockerapi_key.pem \
-out /app/dockerapi_cert.pem \
-subj /CN=dockerapi/O=mailcow \
-addext subjectAltName=DNS:dockerapi
COPY dockerapi.py /app/ COPY dockerapi.py /app/
COPY async-dockerapi.py /app/
CMD ["uvicorn", "--host", "0.0.0.0", "--port", "443", "--ssl-certfile=/app/dockerapi_cert.pem", "--ssl-keyfile=/app/dockerapi_key.pem", "async-dockerapi:app"] ENTRYPOINT ["/bin/sh", "/app/docker-entrypoint.sh"]

View File

@ -1,635 +0,0 @@
from fastapi import FastAPI, Response, Request
import aiodocker
import psutil
import sys
import re
import time
import os
import json
import asyncio
import redis
from datetime import datetime
containerIds_to_update = []
host_stats_isUpdating = False
app = FastAPI()
@app.get("/host/stats")
async def get_host_update_stats():
global host_stats_isUpdating
if host_stats_isUpdating == False:
print("start host stats task")
asyncio.create_task(get_host_stats())
host_stats_isUpdating = True
while True:
if redis_client.exists('host_stats'):
break
print("wait for host_stats results")
await asyncio.sleep(1.5)
print("host stats pulled")
stats = json.loads(redis_client.get('host_stats'))
return Response(content=json.dumps(stats, indent=4), media_type="application/json")
@app.get("/containers/{container_id}/json")
async def get_container(container_id : str):
if container_id and container_id.isalnum():
try:
for container in (await async_docker_client.containers.list()):
if container._id == container_id:
container_info = await container.show()
return Response(content=json.dumps(container_info, indent=4), media_type="application/json")
res = {
"type": "danger",
"msg": "no container found"
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
except Exception as e:
res = {
"type": "danger",
"msg": str(e)
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else:
res = {
"type": "danger",
"msg": "no or invalid id defined"
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
@app.get("/containers/json")
async def get_containers():
containers = {}
try:
for container in (await async_docker_client.containers.list()):
container_info = await container.show()
containers.update({container_info['Id']: container_info})
return Response(content=json.dumps(containers, indent=4), media_type="application/json")
except Exception as e:
res = {
"type": "danger",
"msg": str(e)
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
@app.post("/containers/{container_id}/{post_action}")
async def post_containers(container_id : str, post_action : str, request: Request):
try :
request_json = await request.json()
except Exception as err:
request_json = {}
if container_id and container_id.isalnum() and post_action:
try:
"""Dispatch container_post api call"""
if post_action == 'exec':
if not request_json or not 'cmd' in request_json:
res = {
"type": "danger",
"msg": "cmd is missing"
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
if not request_json or not 'task' in request_json:
res = {
"type": "danger",
"msg": "task is missing"
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
api_call_method_name = '__'.join(['container_post', str(post_action), str(request_json['cmd']), str(request_json['task']) ])
else:
api_call_method_name = '__'.join(['container_post', str(post_action) ])
docker_utils = DockerUtils(async_docker_client)
api_call_method = getattr(docker_utils, api_call_method_name, lambda container_id: Response(content=json.dumps({'type': 'danger', 'msg':'container_post - unknown api call' }, indent=4), media_type="application/json"))
print("api call: %s, container_id: %s" % (api_call_method_name, container_id))
return await api_call_method(container_id, request_json)
except Exception as e:
print("error - container_post: %s" % str(e))
res = {
"type": "danger",
"msg": str(e)
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else:
res = {
"type": "danger",
"msg": "invalid container id or missing action"
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
@app.post("/container/{container_id}/stats/update")
async def post_container_update_stats(container_id : str):
global containerIds_to_update
# start update task for container if no task is running
if container_id not in containerIds_to_update:
asyncio.create_task(get_container_stats(container_id))
containerIds_to_update.append(container_id)
while True:
if redis_client.exists(container_id + '_stats'):
break
await asyncio.sleep(1.5)
stats = json.loads(redis_client.get(container_id + '_stats'))
return Response(content=json.dumps(stats, indent=4), media_type="application/json")
class DockerUtils:
def __init__(self, docker_client):
self.docker_client = docker_client
# api call: container_post - post_action: stop
async def container_post__stop(self, container_id, request_json):
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
await container.stop()
res = {
'type': 'success',
'msg': 'command completed successfully'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
# api call: container_post - post_action: start
async def container_post__start(self, container_id, request_json):
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
await container.start()
res = {
'type': 'success',
'msg': 'command completed successfully'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
# api call: container_post - post_action: restart
async def container_post__restart(self, container_id, request_json):
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
await container.restart()
res = {
'type': 'success',
'msg': 'command completed successfully'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
# api call: container_post - post_action: top
async def container_post__top(self, container_id, request_json):
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
ps_exec = await container.exec("ps")
async with ps_exec.start(detach=False) as stream:
ps_return = await stream.read_out()
exec_details = await ps_exec.inspect()
if exec_details["ExitCode"] == None or exec_details["ExitCode"] == 0:
res = {
'type': 'success',
'msg': ps_return.data.decode('utf-8')
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else:
res = {
'type': 'danger',
'msg': ''
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
# api call: container_post - post_action: exec - cmd: mailq - task: delete
async def container_post__exec__mailq__delete(self, container_id, request_json):
if 'items' in request_json:
r = re.compile("^[0-9a-fA-F]+$")
filtered_qids = filter(r.match, request_json['items'])
if filtered_qids:
flagged_qids = ['-d %s' % i for i in filtered_qids]
sanitized_string = str(' '.join(flagged_qids))
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
postsuper_r_exec = await container.exec(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string])
return await exec_run_handler('generic', postsuper_r_exec)
# api call: container_post - post_action: exec - cmd: mailq - task: hold
async def container_post__exec__mailq__hold(self, container_id, request_json):
if 'items' in request_json:
r = re.compile("^[0-9a-fA-F]+$")
filtered_qids = filter(r.match, request_json['items'])
if filtered_qids:
flagged_qids = ['-h %s' % i for i in filtered_qids]
sanitized_string = str(' '.join(flagged_qids))
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
postsuper_r_exec = await container.exec(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string])
return await exec_run_handler('generic', postsuper_r_exec)
# api call: container_post - post_action: exec - cmd: mailq - task: cat
async def container_post__exec__mailq__cat(self, container_id, request_json):
if 'items' in request_json:
r = re.compile("^[0-9a-fA-F]+$")
filtered_qids = filter(r.match, request_json['items'])
if filtered_qids:
sanitized_string = str(' '.join(filtered_qids))
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
postcat_exec = await container.exec(["/bin/bash", "-c", "/usr/sbin/postcat -q " + sanitized_string], user='postfix')
return exec_run_handler('utf8_text_only', postcat_exec)
# api call: container_post - post_action: exec - cmd: mailq - task: unhold
async def container_post__exec__mailq__unhold(self, container_id, request_json):
if 'items' in request_json:
r = re.compile("^[0-9a-fA-F]+$")
filtered_qids = filter(r.match, request_json['items'])
if filtered_qids:
flagged_qids = ['-H %s' % i for i in filtered_qids]
sanitized_string = str(' '.join(flagged_qids))
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
postsuper_r_exec = await container.exec(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string])
return await exec_run_handler('generic', postsuper_r_exec)
# api call: container_post - post_action: exec - cmd: mailq - task: deliver
async def container_post__exec__mailq__deliver(self, container_id, request_json):
if 'items' in request_json:
r = re.compile("^[0-9a-fA-F]+$")
filtered_qids = filter(r.match, request_json['items'])
if filtered_qids:
flagged_qids = ['-i %s' % i for i in filtered_qids]
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
for i in flagged_qids:
postsuper_r_exec = await container.exec(["/bin/bash", "-c", "/usr/sbin/postqueue " + i], user='postfix')
async with postsuper_r_exec.start(detach=False) as stream:
postsuper_r_return = await stream.read_out()
# todo: check each exit code
res = {
'type': 'success',
'msg': 'Scheduled immediate delivery'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
# api call: container_post - post_action: exec - cmd: mailq - task: list
async def container_post__exec__mailq__list(self, container_id, request_json):
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
mailq_exec = await container.exec(["/usr/sbin/postqueue", "-j"], user='postfix')
return await exec_run_handler('utf8_text_only', mailq_exec)
# api call: container_post - post_action: exec - cmd: mailq - task: flush
async def container_post__exec__mailq__flush(self, container_id, request_json):
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
postsuper_r_exec = await container.exec(["/usr/sbin/postqueue", "-f"], user='postfix')
return await exec_run_handler('generic', postsuper_r_exec)
# api call: container_post - post_action: exec - cmd: mailq - task: super_delete
async def container_post__exec__mailq__super_delete(self, container_id, request_json):
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
postsuper_r_exec = await container.exec(["/usr/sbin/postsuper", "-d", "ALL"])
return await exec_run_handler('generic', postsuper_r_exec)
# api call: container_post - post_action: exec - cmd: system - task: fts_rescan
async def container_post__exec__system__fts_rescan(self, container_id, request_json):
if 'username' in request_json:
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
rescan_exec = await container.exec(["/bin/bash", "-c", "/usr/bin/doveadm fts rescan -u '" + request_json['username'].replace("'", "'\\''") + "'"], user='vmail')
async with rescan_exec.start(detach=False) as stream:
rescan_return = await stream.read_out()
exec_details = await rescan_exec.inspect()
if exec_details["ExitCode"] == None or exec_details["ExitCode"] == 0:
res = {
'type': 'success',
'msg': 'fts_rescan: rescan triggered'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else:
res = {
'type': 'warning',
'msg': 'fts_rescan error'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
if 'all' in request_json:
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
rescan_exec = await container.exec(["/bin/bash", "-c", "/usr/bin/doveadm fts rescan -A"], user='vmail')
async with rescan_exec.start(detach=False) as stream:
rescan_return = await stream.read_out()
exec_details = await rescan_exec.inspect()
if exec_details["ExitCode"] == None or exec_details["ExitCode"] == 0:
res = {
'type': 'success',
'msg': 'fts_rescan: rescan triggered'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else:
res = {
'type': 'warning',
'msg': 'fts_rescan error'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
# api call: container_post - post_action: exec - cmd: system - task: df
async def container_post__exec__system__df(self, container_id, request_json):
if 'dir' in request_json:
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
df_exec = await container.exec(["/bin/bash", "-c", "/bin/df -H '" + request_json['dir'].replace("'", "'\\''") + "' | /usr/bin/tail -n1 | /usr/bin/tr -s [:blank:] | /usr/bin/tr ' ' ','"], user='nobody')
async with df_exec.start(detach=False) as stream:
df_return = await stream.read_out()
print(df_return)
print(await df_exec.inspect())
exec_details = await df_exec.inspect()
if exec_details["ExitCode"] == None or exec_details["ExitCode"] == 0:
return df_return.data.decode('utf-8').rstrip()
else:
return "0,0,0,0,0,0"
# api call: container_post - post_action: exec - cmd: system - task: mysql_upgrade
async def container_post__exec__system__mysql_upgrade(self, container_id, request_json):
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
sql_exec = await container.exec(["/bin/bash", "-c", "/usr/bin/mysql_upgrade -uroot -p'" + os.environ['DBROOT'].replace("'", "'\\''") + "'\n"], user='mysql')
async with sql_exec.start(detach=False) as stream:
sql_return = await stream.read_out()
exec_details = await sql_exec.inspect()
if exec_details["ExitCode"] == None or exec_details["ExitCode"] == 0:
matched = False
for line in sql_return.data.decode('utf-8').split("\n"):
if 'is already upgraded to' in line:
matched = True
if matched:
res = {
'type': 'success',
'msg': 'mysql_upgrade: already upgraded',
'text': sql_return.data.decode('utf-8')
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else:
await container.restart()
res = {
'type': 'warning',
'msg': 'mysql_upgrade: upgrade was applied',
'text': sql_return.data.decode('utf-8')
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else:
res = {
'type': 'error',
'msg': 'mysql_upgrade: error running command',
'text': sql_return.data.decode('utf-8')
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
# api call: container_post - post_action: exec - cmd: system - task: mysql_tzinfo_to_sql
async def container_post__exec__system__mysql_tzinfo_to_sql(self, container_id, request_json):
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
sql_exec = await container.exec(["/bin/bash", "-c", "/usr/bin/mysql_tzinfo_to_sql /usr/share/zoneinfo | /bin/sed 's/Local time zone must be set--see zic manual page/FCTY/' | /usr/bin/mysql -uroot -p'" + os.environ['DBROOT'].replace("'", "'\\''") + "' mysql \n"], user='mysql')
async with sql_exec.start(detach=False) as stream:
sql_return = await stream.read_out()
exec_details = await sql_exec.inspect()
if exec_details["ExitCode"] == None or exec_details["ExitCode"] == 0:
res = {
'type': 'info',
'msg': 'mysql_tzinfo_to_sql: command completed successfully',
'text': sql_return.data.decode('utf-8')
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else:
res = {
'type': 'error',
'msg': 'mysql_tzinfo_to_sql: error running command',
'text': sql_return.data.decode('utf-8')
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
# api call: container_post - post_action: exec - cmd: reload - task: dovecot
async def container_post__exec__reload__dovecot(self, container_id, request_json):
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
reload_exec = await container.exec(["/bin/bash", "-c", "/usr/sbin/dovecot reload"])
return await exec_run_handler('generic', reload_exec)
# api call: container_post - post_action: exec - cmd: reload - task: postfix
async def container_post__exec__reload__postfix(self, container_id, request_json):
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
reload_exec = await container.exec(["/bin/bash", "-c", "/usr/sbin/postfix reload"])
return await exec_run_handler('generic', reload_exec)
# api call: container_post - post_action: exec - cmd: reload - task: nginx
async def container_post__exec__reload__nginx(self, container_id, request_json):
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
reload_exec = await container.exec(["/bin/sh", "-c", "/usr/sbin/nginx -s reload"])
return await exec_run_handler('generic', reload_exec)
# api call: container_post - post_action: exec - cmd: sieve - task: list
async def container_post__exec__sieve__list(self, container_id, request_json):
if 'username' in request_json:
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
sieve_exec = await container.exec(["/bin/bash", "-c", "/usr/bin/doveadm sieve list -u '" + request_json['username'].replace("'", "'\\''") + "'"])
return await exec_run_handler('utf8_text_only', sieve_exec)
# api call: container_post - post_action: exec - cmd: sieve - task: print
async def container_post__exec__sieve__print(self, container_id, request_json):
if 'username' in request_json and 'script_name' in request_json:
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
cmd = ["/bin/bash", "-c", "/usr/bin/doveadm sieve get -u '" + request_json['username'].replace("'", "'\\''") + "' '" + request_json['script_name'].replace("'", "'\\''") + "'"]
sieve_exec = await container.exec(cmd)
return await exec_run_handler('utf8_text_only', sieve_exec)
# api call: container_post - post_action: exec - cmd: maildir - task: cleanup
async def container_post__exec__maildir__cleanup(self, container_id, request_json):
if 'maildir' in request_json:
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
sane_name = re.sub(r'\W+', '', request_json['maildir'])
cmd = ["/bin/bash", "-c", "if [[ -d '/var/vmail/" + request_json['maildir'].replace("'", "'\\''") + "' ]]; then /bin/mv '/var/vmail/" + request_json['maildir'].replace("'", "'\\''") + "' '/var/vmail/_garbage/" + str(int(time.time())) + "_" + sane_name + "'; fi"]
maildir_cleanup_exec = await container.exec(cmd, user='vmail')
return await exec_run_handler('generic', maildir_cleanup_exec)
# api call: container_post - post_action: exec - cmd: rspamd - task: worker_password
async def container_post__exec__rspamd__worker_password(self, container_id, request_json):
if 'raw' in request_json:
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
cmd = "/usr/bin/rspamadm pw -e -p '" + request_json['raw'].replace("'", "'\\''") + "' 2> /dev/null"
rspamd_password_exec = await container.exec(cmd, user='_rspamd')
async with rspamd_password_exec.start(detach=False) as stream:
rspamd_password_return = await stream.read_out()
matched = False
for line in rspamd_password_return.data.decode('utf-8').split("\n"):
if '$2$' in line:
hash = line.strip()
hash_out = re.search('\$2\$.+$', hash).group(0)
rspamd_passphrase_hash = re.sub('[^0-9a-zA-Z\$]+', '', hash_out.rstrip())
rspamd_password_filename = "/etc/rspamd/override.d/worker-controller-password.inc"
cmd = '''/bin/echo 'enable_password = "%s";' > %s && cat %s''' % (rspamd_passphrase_hash, rspamd_password_filename, rspamd_password_filename)
rspamd_password_exec = await container.exec(cmd, user='_rspamd')
async with rspamd_password_exec.start(detach=False) as stream:
rspamd_password_return = await stream.read_out()
if rspamd_passphrase_hash.startswith("$2$") and rspamd_passphrase_hash in rspamd_password_return.data.decode('utf-8'):
await container.restart()
matched = True
if matched:
res = {
'type': 'success',
'msg': 'command completed successfully'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else:
res = {
'type': 'danger',
'msg': 'command did not complete'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
async def exec_run_handler(type, exec_obj):
async with exec_obj.start(detach=False) as stream:
exec_return = await stream.read_out()
if exec_return == None:
exec_return = ""
else:
exec_return = exec_return.data.decode('utf-8')
if type == 'generic':
exec_details = await exec_obj.inspect()
if exec_details["ExitCode"] == None or exec_details["ExitCode"] == 0:
res = {
"type": "success",
"msg": "command completed successfully"
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else:
res = {
"type": "success",
"msg": "'command failed: " + exec_return
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
if type == 'utf8_text_only':
return Response(content=exec_return, media_type="text/plain")
async def get_host_stats(wait=5):
global host_stats_isUpdating
try:
system_time = datetime.now()
host_stats = {
"cpu": {
"cores": psutil.cpu_count(),
"usage": psutil.cpu_percent()
},
"memory": {
"total": psutil.virtual_memory().total,
"usage": psutil.virtual_memory().percent,
"swap": psutil.swap_memory()
},
"uptime": time.time() - psutil.boot_time(),
"system_time": system_time.strftime("%d.%m.%Y %H:%M:%S")
}
redis_client.set('host_stats', json.dumps(host_stats), ex=10)
except Exception as e:
res = {
"type": "danger",
"msg": str(e)
}
print(json.dumps(res, indent=4))
await asyncio.sleep(wait)
host_stats_isUpdating = False
async def get_container_stats(container_id, wait=5, stop=False):
global containerIds_to_update
if container_id and container_id.isalnum():
try:
for container in (await async_docker_client.containers.list()):
if container._id == container_id:
res = await container.stats(stream=False)
if redis_client.exists(container_id + '_stats'):
stats = json.loads(redis_client.get(container_id + '_stats'))
else:
stats = []
stats.append(res[0])
if len(stats) > 3:
del stats[0]
redis_client.set(container_id + '_stats', json.dumps(stats), ex=60)
except Exception as e:
res = {
"type": "danger",
"msg": str(e)
}
print(json.dumps(res, indent=4))
else:
res = {
"type": "danger",
"msg": "no or invalid id defined"
}
print(json.dumps(res, indent=4))
await asyncio.sleep(wait)
if stop == True:
# update task was called second time, stop
containerIds_to_update.remove(container_id)
else:
# call update task a second time
await get_container_stats(container_id, wait=0, stop=True)
if os.environ['REDIS_SLAVEOF_IP'] != "":
redis_client = redis.Redis(host=os.environ['REDIS_SLAVEOF_IP'], port=os.environ['REDIS_SLAVEOF_PORT'], db=0)
else:
redis_client = redis.Redis(host='redis-mailcow', port=6379, db=0)
async_docker_client = aiodocker.Docker(url='unix:///var/run/docker.sock')

View File

@ -0,0 +1,9 @@
#!/bin/bash
`openssl req -x509 -newkey rsa:4096 -sha256 -days 3650 -nodes \
-keyout /app/dockerapi_key.pem \
-out /app/dockerapi_cert.pem \
-subj /CN=dockerapi/O=mailcow \
-addext subjectAltName=DNS:dockerapi`
`uvicorn --host 0.0.0.0 --port 443 --ssl-certfile=/app/dockerapi_cert.pem --ssl-keyfile=/app/dockerapi_key.pem dockerapi:app`

View File

@ -1,316 +1,506 @@
#!/usr/bin/env python3 from fastapi import FastAPI, Response, Request
import aiodocker
from flask import Flask import psutil
from flask_restful import Resource, Api import sys
from flask import jsonify import re
from flask import Response
from flask import request
from threading import Thread
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import docker
import uuid
import signal
import time import time
import os import os
import re import json
import sys import asyncio
import ssl import redis
import socket from datetime import datetime
import subprocess
import traceback
import psutil
docker_client = docker.DockerClient(base_url='unix://var/run/docker.sock', version='auto')
app = Flask(__name__)
api = Api(app)
class containers_get(Resource): containerIds_to_update = []
def get(self): host_stats_isUpdating = False
containers = {} app = FastAPI()
try:
for container in docker_client.containers.list(all=True):
containers.update({container.attrs['Id']: container.attrs})
return containers
except Exception as e:
return jsonify(type='danger', msg=str(e))
class container_get(Resource):
def get(self, container_id): @app.get("/host/stats")
async def get_host_update_stats():
global host_stats_isUpdating
if host_stats_isUpdating == False:
print("start host stats task")
asyncio.create_task(get_host_stats())
host_stats_isUpdating = True
while True:
if redis_client.exists('host_stats'):
break
print("wait for host_stats results")
await asyncio.sleep(1.5)
print("host stats pulled")
stats = json.loads(redis_client.get('host_stats'))
return Response(content=json.dumps(stats, indent=4), media_type="application/json")
@app.get("/containers/{container_id}/json")
async def get_container(container_id : str):
if container_id and container_id.isalnum(): if container_id and container_id.isalnum():
try: try:
for container in docker_client.containers.list(all=True, filters={"id": container_id}): for container in (await async_docker_client.containers.list()):
return container.attrs if container._id == container_id:
except Exception as e: container_info = await container.show()
return jsonify(type='danger', msg=str(e)) return Response(content=json.dumps(container_info, indent=4), media_type="application/json")
else:
return jsonify(type='danger', msg='no or invalid id defined') res = {
"type": "danger",
"msg": "no container found"
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
except Exception as e:
res = {
"type": "danger",
"msg": str(e)
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else:
res = {
"type": "danger",
"msg": "no or invalid id defined"
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
@app.get("/containers/json")
async def get_containers():
containers = {}
try:
for container in (await async_docker_client.containers.list()):
container_info = await container.show()
containers.update({container_info['Id']: container_info})
return Response(content=json.dumps(containers, indent=4), media_type="application/json")
except Exception as e:
res = {
"type": "danger",
"msg": str(e)
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
@app.post("/containers/{container_id}/{post_action}")
async def post_containers(container_id : str, post_action : str, request: Request):
try :
request_json = await request.json()
except Exception as err:
request_json = {}
class container_post(Resource):
def post(self, container_id, post_action):
if container_id and container_id.isalnum() and post_action: if container_id and container_id.isalnum() and post_action:
try: try:
"""Dispatch container_post api call""" """Dispatch container_post api call"""
if post_action == 'exec': if post_action == 'exec':
if not request.json or not 'cmd' in request.json: if not request_json or not 'cmd' in request_json:
return jsonify(type='danger', msg='cmd is missing') res = {
if not request.json or not 'task' in request.json: "type": "danger",
return jsonify(type='danger', msg='task is missing') "msg": "cmd is missing"
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
if not request_json or not 'task' in request_json:
res = {
"type": "danger",
"msg": "task is missing"
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
api_call_method_name = '__'.join(['container_post', str(post_action), str(request.json['cmd']), str(request.json['task']) ]) api_call_method_name = '__'.join(['container_post', str(post_action), str(request_json['cmd']), str(request_json['task']) ])
else: else:
api_call_method_name = '__'.join(['container_post', str(post_action) ]) api_call_method_name = '__'.join(['container_post', str(post_action) ])
api_call_method = getattr(self, api_call_method_name, lambda container_id: jsonify(type='danger', msg='container_post - unknown api call')) docker_utils = DockerUtils(async_docker_client)
api_call_method = getattr(docker_utils, api_call_method_name, lambda container_id: Response(content=json.dumps({'type': 'danger', 'msg':'container_post - unknown api call' }, indent=4), media_type="application/json"))
print("api call: %s, container_id: %s" % (api_call_method_name, container_id)) print("api call: %s, container_id: %s" % (api_call_method_name, container_id))
return api_call_method(container_id) return await api_call_method(container_id, request_json)
except Exception as e: except Exception as e:
print("error - container_post: %s" % str(e)) print("error - container_post: %s" % str(e))
return jsonify(type='danger', msg=str(e)) res = {
"type": "danger",
"msg": str(e)
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else: else:
return jsonify(type='danger', msg='invalid container id or missing action') res = {
"type": "danger",
"msg": "invalid container id or missing action"
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
@app.post("/container/{container_id}/stats/update")
async def post_container_update_stats(container_id : str):
global containerIds_to_update
# start update task for container if no task is running
if container_id not in containerIds_to_update:
asyncio.create_task(get_container_stats(container_id))
containerIds_to_update.append(container_id)
while True:
if redis_client.exists(container_id + '_stats'):
break
await asyncio.sleep(1.5)
stats = json.loads(redis_client.get(container_id + '_stats'))
return Response(content=json.dumps(stats, indent=4), media_type="application/json")
class DockerUtils:
def __init__(self, docker_client):
self.docker_client = docker_client
# api call: container_post - post_action: stop # api call: container_post - post_action: stop
def container_post__stop(self, container_id): async def container_post__stop(self, container_id, request_json):
for container in docker_client.containers.list(all=True, filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
container.stop() if container._id == container_id:
return jsonify(type='success', msg='command completed successfully') await container.stop()
res = {
'type': 'success',
'msg': 'command completed successfully'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
# api call: container_post - post_action: start # api call: container_post - post_action: start
def container_post__start(self, container_id): async def container_post__start(self, container_id, request_json):
for container in docker_client.containers.list(all=True, filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
container.start() if container._id == container_id:
return jsonify(type='success', msg='command completed successfully') await container.start()
res = {
'type': 'success',
'msg': 'command completed successfully'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
# api call: container_post - post_action: restart # api call: container_post - post_action: restart
def container_post__restart(self, container_id): async def container_post__restart(self, container_id, request_json):
for container in docker_client.containers.list(all=True, filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
container.restart() if container._id == container_id:
return jsonify(type='success', msg='command completed successfully') await container.restart()
res = {
'type': 'success',
'msg': 'command completed successfully'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
# api call: container_post - post_action: top # api call: container_post - post_action: top
def container_post__top(self, container_id): async def container_post__top(self, container_id, request_json):
for container in docker_client.containers.list(all=True, filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
return jsonify(type='success', msg=container.top()) if container._id == container_id:
ps_exec = await container.exec("ps")
async with ps_exec.start(detach=False) as stream:
ps_return = await stream.read_out()
exec_details = await ps_exec.inspect()
# api call: container_post - post_action: stats if exec_details["ExitCode"] == None or exec_details["ExitCode"] == 0:
def container_post__stats(self, container_id): res = {
for container in docker_client.containers.list(all=True, filters={"id": container_id}): 'type': 'success',
for stat in container.stats(decode=True, stream=True): 'msg': ps_return.data.decode('utf-8')
return jsonify(type='success', msg=stat ) }
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else:
res = {
'type': 'danger',
'msg': ''
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
# api call: container_post - post_action: exec - cmd: mailq - task: delete # api call: container_post - post_action: exec - cmd: mailq - task: delete
def container_post__exec__mailq__delete(self, container_id): async def container_post__exec__mailq__delete(self, container_id, request_json):
if 'items' in request.json: if 'items' in request_json:
r = re.compile("^[0-9a-fA-F]+$") r = re.compile("^[0-9a-fA-F]+$")
filtered_qids = filter(r.match, request.json['items']) filtered_qids = filter(r.match, request_json['items'])
if filtered_qids: if filtered_qids:
flagged_qids = ['-d %s' % i for i in filtered_qids] flagged_qids = ['-d %s' % i for i in filtered_qids]
sanitized_string = str(' '.join(flagged_qids)); sanitized_string = str(' '.join(flagged_qids))
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
postsuper_r = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string]) if container._id == container_id:
return exec_run_handler('generic', postsuper_r) postsuper_r_exec = await container.exec(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string])
return await exec_run_handler('generic', postsuper_r_exec)
# api call: container_post - post_action: exec - cmd: mailq - task: hold # api call: container_post - post_action: exec - cmd: mailq - task: hold
def container_post__exec__mailq__hold(self, container_id): async def container_post__exec__mailq__hold(self, container_id, request_json):
if 'items' in request.json: if 'items' in request_json:
r = re.compile("^[0-9a-fA-F]+$") r = re.compile("^[0-9a-fA-F]+$")
filtered_qids = filter(r.match, request.json['items']) filtered_qids = filter(r.match, request_json['items'])
if filtered_qids: if filtered_qids:
flagged_qids = ['-h %s' % i for i in filtered_qids] flagged_qids = ['-h %s' % i for i in filtered_qids]
sanitized_string = str(' '.join(flagged_qids)); sanitized_string = str(' '.join(flagged_qids))
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
postsuper_r = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string]) if container._id == container_id:
return exec_run_handler('generic', postsuper_r) postsuper_r_exec = await container.exec(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string])
return await exec_run_handler('generic', postsuper_r_exec)
# api call: container_post - post_action: exec - cmd: mailq - task: cat # api call: container_post - post_action: exec - cmd: mailq - task: cat
def container_post__exec__mailq__cat(self, container_id): async def container_post__exec__mailq__cat(self, container_id, request_json):
if 'items' in request.json: if 'items' in request_json:
r = re.compile("^[0-9a-fA-F]+$") r = re.compile("^[0-9a-fA-F]+$")
filtered_qids = filter(r.match, request.json['items']) filtered_qids = filter(r.match, request_json['items'])
if filtered_qids: if filtered_qids:
sanitized_string = str(' '.join(filtered_qids)); sanitized_string = str(' '.join(filtered_qids))
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
postcat_return = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postcat -q " + sanitized_string], user='postfix') if container._id == container_id:
if not postcat_return: postcat_exec = await container.exec(["/bin/bash", "-c", "/usr/sbin/postcat -q " + sanitized_string], user='postfix')
postcat_return = 'err: invalid' return exec_run_handler('utf8_text_only', postcat_exec)
return exec_run_handler('utf8_text_only', postcat_return)
# api call: container_post - post_action: exec - cmd: mailq - task: unhold # api call: container_post - post_action: exec - cmd: mailq - task: unhold
def container_post__exec__mailq__unhold(self, container_id): async def container_post__exec__mailq__unhold(self, container_id, request_json):
if 'items' in request.json: if 'items' in request_json:
r = re.compile("^[0-9a-fA-F]+$") r = re.compile("^[0-9a-fA-F]+$")
filtered_qids = filter(r.match, request.json['items']) filtered_qids = filter(r.match, request_json['items'])
if filtered_qids: if filtered_qids:
flagged_qids = ['-H %s' % i for i in filtered_qids] flagged_qids = ['-H %s' % i for i in filtered_qids]
sanitized_string = str(' '.join(flagged_qids)); sanitized_string = str(' '.join(flagged_qids))
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
postsuper_r = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string]) if container._id == container_id:
return exec_run_handler('generic', postsuper_r) postsuper_r_exec = await container.exec(["/bin/bash", "-c", "/usr/sbin/postsuper " + sanitized_string])
return await exec_run_handler('generic', postsuper_r_exec)
# api call: container_post - post_action: exec - cmd: mailq - task: deliver # api call: container_post - post_action: exec - cmd: mailq - task: deliver
def container_post__exec__mailq__deliver(self, container_id): async def container_post__exec__mailq__deliver(self, container_id, request_json):
if 'items' in request.json: if 'items' in request_json:
r = re.compile("^[0-9a-fA-F]+$") r = re.compile("^[0-9a-fA-F]+$")
filtered_qids = filter(r.match, request.json['items']) filtered_qids = filter(r.match, request_json['items'])
if filtered_qids: if filtered_qids:
flagged_qids = ['-i %s' % i for i in filtered_qids] flagged_qids = ['-i %s' % i for i in filtered_qids]
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
if container._id == container_id:
for i in flagged_qids: for i in flagged_qids:
postqueue_r = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postqueue " + i], user='postfix') postsuper_r_exec = await container.exec(["/bin/bash", "-c", "/usr/sbin/postqueue " + i], user='postfix')
async with postsuper_r_exec.start(detach=False) as stream:
postsuper_r_return = await stream.read_out()
# todo: check each exit code # todo: check each exit code
return jsonify(type='success', msg=str("Scheduled immediate delivery")) res = {
'type': 'success',
'msg': 'Scheduled immediate delivery'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
# api call: container_post - post_action: exec - cmd: mailq - task: list # api call: container_post - post_action: exec - cmd: mailq - task: list
def container_post__exec__mailq__list(self, container_id): async def container_post__exec__mailq__list(self, container_id, request_json):
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
mailq_return = container.exec_run(["/usr/sbin/postqueue", "-j"], user='postfix') if container._id == container_id:
return exec_run_handler('utf8_text_only', mailq_return) mailq_exec = await container.exec(["/usr/sbin/postqueue", "-j"], user='postfix')
return await exec_run_handler('utf8_text_only', mailq_exec)
# api call: container_post - post_action: exec - cmd: mailq - task: flush # api call: container_post - post_action: exec - cmd: mailq - task: flush
def container_post__exec__mailq__flush(self, container_id): async def container_post__exec__mailq__flush(self, container_id, request_json):
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
postqueue_r = container.exec_run(["/usr/sbin/postqueue", "-f"], user='postfix') if container._id == container_id:
return exec_run_handler('generic', postqueue_r) postsuper_r_exec = await container.exec(["/usr/sbin/postqueue", "-f"], user='postfix')
return await exec_run_handler('generic', postsuper_r_exec)
# api call: container_post - post_action: exec - cmd: mailq - task: super_delete # api call: container_post - post_action: exec - cmd: mailq - task: super_delete
def container_post__exec__mailq__super_delete(self, container_id): async def container_post__exec__mailq__super_delete(self, container_id, request_json):
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
postsuper_r = container.exec_run(["/usr/sbin/postsuper", "-d", "ALL"]) if container._id == container_id:
return exec_run_handler('generic', postsuper_r) postsuper_r_exec = await container.exec(["/usr/sbin/postsuper", "-d", "ALL"])
return await exec_run_handler('generic', postsuper_r_exec)
# api call: container_post - post_action: exec - cmd: system - task: fts_rescan # api call: container_post - post_action: exec - cmd: system - task: fts_rescan
def container_post__exec__system__fts_rescan(self, container_id): async def container_post__exec__system__fts_rescan(self, container_id, request_json):
if 'username' in request.json: if 'username' in request_json:
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
rescan_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/doveadm fts rescan -u '" + request.json['username'].replace("'", "'\\''") + "'"], user='vmail') if container._id == container_id:
if rescan_return.exit_code == 0: rescan_exec = await container.exec(["/bin/bash", "-c", "/usr/bin/doveadm fts rescan -u '" + request_json['username'].replace("'", "'\\''") + "'"], user='vmail')
return jsonify(type='success', msg='fts_rescan: rescan triggered') async with rescan_exec.start(detach=False) as stream:
else: rescan_return = await stream.read_out()
return jsonify(type='warning', msg='fts_rescan error')
if 'all' in request.json: exec_details = await rescan_exec.inspect()
for container in docker_client.containers.list(filters={"id": container_id}): if exec_details["ExitCode"] == None or exec_details["ExitCode"] == 0:
rescan_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/doveadm fts rescan -A"], user='vmail') res = {
if rescan_return.exit_code == 0: 'type': 'success',
return jsonify(type='success', msg='fts_rescan: rescan triggered') 'msg': 'fts_rescan: rescan triggered'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else: else:
return jsonify(type='warning', msg='fts_rescan error') res = {
'type': 'warning',
'msg': 'fts_rescan error'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
if 'all' in request_json:
for container in (await self.docker_client.containers.list()):
if container._id == container_id:
rescan_exec = await container.exec(["/bin/bash", "-c", "/usr/bin/doveadm fts rescan -A"], user='vmail')
async with rescan_exec.start(detach=False) as stream:
rescan_return = await stream.read_out()
exec_details = await rescan_exec.inspect()
if exec_details["ExitCode"] == None or exec_details["ExitCode"] == 0:
res = {
'type': 'success',
'msg': 'fts_rescan: rescan triggered'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else:
res = {
'type': 'warning',
'msg': 'fts_rescan error'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
# api call: container_post - post_action: exec - cmd: system - task: df # api call: container_post - post_action: exec - cmd: system - task: df
def container_post__exec__system__df(self, container_id): async def container_post__exec__system__df(self, container_id, request_json):
if 'dir' in request.json: if 'dir' in request_json:
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
df_return = container.exec_run(["/bin/bash", "-c", "/bin/df -H '" + request.json['dir'].replace("'", "'\\''") + "' | /usr/bin/tail -n1 | /usr/bin/tr -s [:blank:] | /usr/bin/tr ' ' ','"], user='nobody') if container._id == container_id:
if df_return.exit_code == 0: df_exec = await container.exec(["/bin/bash", "-c", "/bin/df -H '" + request_json['dir'].replace("'", "'\\''") + "' | /usr/bin/tail -n1 | /usr/bin/tr -s [:blank:] | /usr/bin/tr ' ' ','"], user='nobody')
return df_return.output.decode('utf-8').rstrip() async with df_exec.start(detach=False) as stream:
df_return = await stream.read_out()
print(df_return)
print(await df_exec.inspect())
exec_details = await df_exec.inspect()
if exec_details["ExitCode"] == None or exec_details["ExitCode"] == 0:
return df_return.data.decode('utf-8').rstrip()
else: else:
return "0,0,0,0,0,0" return "0,0,0,0,0,0"
# api call: container_post - post_action: exec - cmd: system - task: mysql_upgrade # api call: container_post - post_action: exec - cmd: system - task: mysql_upgrade
def container_post__exec__system__mysql_upgrade(self, container_id): async def container_post__exec__system__mysql_upgrade(self, container_id, request_json):
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
sql_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/mysql_upgrade -uroot -p'" + os.environ['DBROOT'].replace("'", "'\\''") + "'\n"], user='mysql') if container._id == container_id:
if sql_return.exit_code == 0: sql_exec = await container.exec(["/bin/bash", "-c", "/usr/bin/mysql_upgrade -uroot -p'" + os.environ['DBROOT'].replace("'", "'\\''") + "'\n"], user='mysql')
async with sql_exec.start(detach=False) as stream:
sql_return = await stream.read_out()
exec_details = await sql_exec.inspect()
if exec_details["ExitCode"] == None or exec_details["ExitCode"] == 0:
matched = False matched = False
for line in sql_return.output.decode('utf-8').split("\n"): for line in sql_return.data.decode('utf-8').split("\n"):
if 'is already upgraded to' in line: if 'is already upgraded to' in line:
matched = True matched = True
if matched: if matched:
return jsonify(type='success', msg='mysql_upgrade: already upgraded', text=sql_return.output.decode('utf-8')) res = {
'type': 'success',
'msg': 'mysql_upgrade: already upgraded',
'text': sql_return.data.decode('utf-8')
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else: else:
container.restart() await container.restart()
return jsonify(type='warning', msg='mysql_upgrade: upgrade was applied', text=sql_return.output.decode('utf-8')) res = {
'type': 'warning',
'msg': 'mysql_upgrade: upgrade was applied',
'text': sql_return.data.decode('utf-8')
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else: else:
return jsonify(type='error', msg='mysql_upgrade: error running command', text=sql_return.output.decode('utf-8')) res = {
'type': 'error',
'msg': 'mysql_upgrade: error running command',
'text': sql_return.data.decode('utf-8')
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
# api call: container_post - post_action: exec - cmd: system - task: mysql_tzinfo_to_sql # api call: container_post - post_action: exec - cmd: system - task: mysql_tzinfo_to_sql
def container_post__exec__system__mysql_tzinfo_to_sql(self, container_id): async def container_post__exec__system__mysql_tzinfo_to_sql(self, container_id, request_json):
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
sql_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/mysql_tzinfo_to_sql /usr/share/zoneinfo | /bin/sed 's/Local time zone must be set--see zic manual page/FCTY/' | /usr/bin/mysql -uroot -p'" + os.environ['DBROOT'].replace("'", "'\\''") + "' mysql \n"], user='mysql') if container._id == container_id:
if sql_return.exit_code == 0: sql_exec = await container.exec(["/bin/bash", "-c", "/usr/bin/mysql_tzinfo_to_sql /usr/share/zoneinfo | /bin/sed 's/Local time zone must be set--see zic manual page/FCTY/' | /usr/bin/mysql -uroot -p'" + os.environ['DBROOT'].replace("'", "'\\''") + "' mysql \n"], user='mysql')
return jsonify(type='info', msg='mysql_tzinfo_to_sql: command completed successfully', text=sql_return.output.decode('utf-8')) async with sql_exec.start(detach=False) as stream:
sql_return = await stream.read_out()
exec_details = await sql_exec.inspect()
if exec_details["ExitCode"] == None or exec_details["ExitCode"] == 0:
res = {
'type': 'info',
'msg': 'mysql_tzinfo_to_sql: command completed successfully',
'text': sql_return.data.decode('utf-8')
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else: else:
return jsonify(type='error', msg='mysql_tzinfo_to_sql: error running command', text=sql_return.output.decode('utf-8')) res = {
'type': 'error',
'msg': 'mysql_tzinfo_to_sql: error running command',
'text': sql_return.data.decode('utf-8')
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
# api call: container_post - post_action: exec - cmd: reload - task: dovecot # api call: container_post - post_action: exec - cmd: reload - task: dovecot
def container_post__exec__reload__dovecot(self, container_id): async def container_post__exec__reload__dovecot(self, container_id, request_json):
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
reload_return = container.exec_run(["/bin/bash", "-c", "/usr/sbin/dovecot reload"]) if container._id == container_id:
return exec_run_handler('generic', reload_return) reload_exec = await container.exec(["/bin/bash", "-c", "/usr/sbin/dovecot reload"])
return await exec_run_handler('generic', reload_exec)
# api call: container_post - post_action: exec - cmd: reload - task: postfix # api call: container_post - post_action: exec - cmd: reload - task: postfix
def container_post__exec__reload__postfix(self, container_id): async def container_post__exec__reload__postfix(self, container_id, request_json):
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
reload_return = container.exec_run(["/bin/bash", "-c", "/usr/sbin/postfix reload"]) if container._id == container_id:
return exec_run_handler('generic', reload_return) reload_exec = await container.exec(["/bin/bash", "-c", "/usr/sbin/postfix reload"])
return await exec_run_handler('generic', reload_exec)
# api call: container_post - post_action: exec - cmd: reload - task: nginx # api call: container_post - post_action: exec - cmd: reload - task: nginx
def container_post__exec__reload__nginx(self, container_id): async def container_post__exec__reload__nginx(self, container_id, request_json):
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
reload_return = container.exec_run(["/bin/sh", "-c", "/usr/sbin/nginx -s reload"]) if container._id == container_id:
return exec_run_handler('generic', reload_return) reload_exec = await container.exec(["/bin/sh", "-c", "/usr/sbin/nginx -s reload"])
return await exec_run_handler('generic', reload_exec)
# api call: container_post - post_action: exec - cmd: sieve - task: list # api call: container_post - post_action: exec - cmd: sieve - task: list
def container_post__exec__sieve__list(self, container_id): async def container_post__exec__sieve__list(self, container_id, request_json):
if 'username' in request.json: if 'username' in request_json:
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
sieve_return = container.exec_run(["/bin/bash", "-c", "/usr/bin/doveadm sieve list -u '" + request.json['username'].replace("'", "'\\''") + "'"]) if container._id == container_id:
return exec_run_handler('utf8_text_only', sieve_return) sieve_exec = await container.exec(["/bin/bash", "-c", "/usr/bin/doveadm sieve list -u '" + request_json['username'].replace("'", "'\\''") + "'"])
return await exec_run_handler('utf8_text_only', sieve_exec)
# api call: container_post - post_action: exec - cmd: sieve - task: print # api call: container_post - post_action: exec - cmd: sieve - task: print
def container_post__exec__sieve__print(self, container_id): async def container_post__exec__sieve__print(self, container_id, request_json):
if 'username' in request.json and 'script_name' in request.json: if 'username' in request_json and 'script_name' in request_json:
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
cmd = ["/bin/bash", "-c", "/usr/bin/doveadm sieve get -u '" + request.json['username'].replace("'", "'\\''") + "' '" + request.json['script_name'].replace("'", "'\\''") + "'"] if container._id == container_id:
sieve_return = container.exec_run(cmd) cmd = ["/bin/bash", "-c", "/usr/bin/doveadm sieve get -u '" + request_json['username'].replace("'", "'\\''") + "' '" + request_json['script_name'].replace("'", "'\\''") + "'"]
return exec_run_handler('utf8_text_only', sieve_return) sieve_exec = await container.exec(cmd)
return await exec_run_handler('utf8_text_only', sieve_exec)
# api call: container_post - post_action: exec - cmd: maildir - task: cleanup # api call: container_post - post_action: exec - cmd: maildir - task: cleanup
def container_post__exec__maildir__cleanup(self, container_id): async def container_post__exec__maildir__cleanup(self, container_id, request_json):
if 'maildir' in request.json: if 'maildir' in request_json:
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
sane_name = re.sub(r'\W+', '', request.json['maildir']) if container._id == container_id:
cmd = ["/bin/bash", "-c", "if [[ -d '/var/vmail/" + request.json['maildir'].replace("'", "'\\''") + "' ]]; then /bin/mv '/var/vmail/" + request.json['maildir'].replace("'", "'\\''") + "' '/var/vmail/_garbage/" + str(int(time.time())) + "_" + sane_name + "'; fi"] sane_name = re.sub(r'\W+', '', request_json['maildir'])
maildir_cleanup = container.exec_run(cmd, user='vmail') cmd = ["/bin/bash", "-c", "if [[ -d '/var/vmail/" + request_json['maildir'].replace("'", "'\\''") + "' ]]; then /bin/mv '/var/vmail/" + request_json['maildir'].replace("'", "'\\''") + "' '/var/vmail/_garbage/" + str(int(time.time())) + "_" + sane_name + "'; fi"]
return exec_run_handler('generic', maildir_cleanup) maildir_cleanup_exec = await container.exec(cmd, user='vmail')
return await exec_run_handler('generic', maildir_cleanup_exec)
# api call: container_post - post_action: exec - cmd: rspamd - task: worker_password # api call: container_post - post_action: exec - cmd: rspamd - task: worker_password
def container_post__exec__rspamd__worker_password(self, container_id): async def container_post__exec__rspamd__worker_password(self, container_id, request_json):
if 'raw' in request.json: if 'raw' in request_json:
for container in docker_client.containers.list(filters={"id": container_id}): for container in (await self.docker_client.containers.list()):
cmd = "/usr/bin/rspamadm pw -e -p '" + request.json['raw'].replace("'", "'\\''") + "' 2> /dev/null" if container._id == container_id:
cmd_response = exec_cmd_container(container, cmd, user="_rspamd")
cmd = "/usr/bin/rspamadm pw -e -p '" + request_json['raw'].replace("'", "'\\''") + "' 2> /dev/null"
rspamd_password_exec = await container.exec(cmd, user='_rspamd')
async with rspamd_password_exec.start(detach=False) as stream:
rspamd_password_return = await stream.read_out()
matched = False matched = False
for line in cmd_response.split("\n"): for line in rspamd_password_return.data.decode('utf-8').split("\n"):
if '$2$' in line: if '$2$' in line:
hash = line.strip() hash = line.strip()
hash_out = re.search('\$2\$.+$', hash).group(0) hash_out = re.search('\$2\$.+$', hash).group(0)
@ -318,46 +508,60 @@ class container_post(Resource):
rspamd_password_filename = "/etc/rspamd/override.d/worker-controller-password.inc" rspamd_password_filename = "/etc/rspamd/override.d/worker-controller-password.inc"
cmd = '''/bin/echo 'enable_password = "%s";' > %s && cat %s''' % (rspamd_passphrase_hash, rspamd_password_filename, rspamd_password_filename) cmd = '''/bin/echo 'enable_password = "%s";' > %s && cat %s''' % (rspamd_passphrase_hash, rspamd_password_filename, rspamd_password_filename)
cmd_response = exec_cmd_container(container, cmd, user="_rspamd") rspamd_password_exec = await container.exec(cmd, user='_rspamd')
async with rspamd_password_exec.start(detach=False) as stream:
rspamd_password_return = await stream.read_out()
if rspamd_passphrase_hash.startswith("$2$") and rspamd_passphrase_hash in cmd_response: if rspamd_passphrase_hash.startswith("$2$") and rspamd_passphrase_hash in rspamd_password_return.data.decode('utf-8'):
container.restart() await container.restart()
matched = True matched = True
if matched: if matched:
return jsonify(type='success', msg='command completed successfully') res = {
'type': 'success',
'msg': 'command completed successfully'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else: else:
return jsonify(type='danger', msg='command did not complete') res = {
'type': 'danger',
'msg': 'command did not complete'
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
async def exec_run_handler(type, exec_obj):
async with exec_obj.start(detach=False) as stream:
exec_return = await stream.read_out()
if exec_return == None:
exec_return = ""
else:
exec_return = exec_return.data.decode('utf-8')
if type == 'generic':
exec_details = await exec_obj.inspect()
if exec_details["ExitCode"] == None or exec_details["ExitCode"] == 0:
res = {
"type": "success",
"msg": "command completed successfully"
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
else:
res = {
"type": "success",
"msg": "'command failed: " + exec_return
}
return Response(content=json.dumps(res, indent=4), media_type="application/json")
if type == 'utf8_text_only':
return Response(content=exec_return, media_type="text/plain")
async def get_host_stats(wait=5):
global host_stats_isUpdating
class host_stats_get(Resource):
def get(self):
try: try:
system_time = datetime.now() system_time = datetime.now()
# get docker stats multithreaded for faster results
containers = docker_client.containers.list()
workers = os.cpu_count() * 4
with ThreadPoolExecutor(workers) as pool:
stats = list(pool.map(lambda x: x.stats(decode=None, stream=False), containers))
bytes_recv_total = 0
bytes_sent_total = 0
bytes_read_total = 0
bytes_write_total = 0
for stat in stats:
if "networks" in stat:
for interface in stat["networks"]:
bytes_recv_total += stat["networks"][interface]["rx_bytes"]
bytes_sent_total += stat["networks"][interface]["tx_bytes"]
if "blkio_stats" in stat:
if "io_service_bytes_recursive" in stat["blkio_stats"]:
if hasattr(stat["blkio_stats"]["io_service_bytes_recursive"], "__len__"):
for blkio in stat["blkio_stats"]["io_service_bytes_recursive"]:
if blkio["op"] == "read":
bytes_read_total += blkio["value"]
elif blkio["op"] == "write":
bytes_write_total += blkio["value"]
host_stats = { host_stats = {
"cpu": { "cpu": {
"cores": psutil.cpu_count(), "cores": psutil.cpu_count(),
@ -368,112 +572,64 @@ class host_stats_get(Resource):
"usage": psutil.virtual_memory().percent, "usage": psutil.virtual_memory().percent,
"swap": psutil.swap_memory() "swap": psutil.swap_memory()
}, },
"disk": {
"bytes_read_total": bytes_read_total,
"bytes_write_total": bytes_write_total
},
"network": {
"bytes_recv_total": bytes_recv_total,
"bytes_sent_total": bytes_sent_total
},
"container_stats": stats,
"uptime": time.time() - psutil.boot_time(), "uptime": time.time() - psutil.boot_time(),
"system_time": system_time.strftime("%d.%m.%Y %H:%M:%S") "system_time": system_time.strftime("%d.%m.%Y %H:%M:%S")
} }
return host_stats
redis_client.set('host_stats', json.dumps(host_stats), ex=10)
except Exception as e: except Exception as e:
return jsonify(type='danger', msg=str(e)) res = {
"type": "danger",
"msg": str(e)
}
print(json.dumps(res, indent=4))
await asyncio.sleep(wait)
host_stats_isUpdating = False
def exec_cmd_container(container, cmd, user, timeout=2, shell_cmd="/bin/bash"): async def get_container_stats(container_id, wait=5, stop=False):
global containerIds_to_update
def recv_socket_data(c_socket, timeout): if container_id and container_id.isalnum():
c_socket.setblocking(0)
total_data=[];
data='';
begin=time.time()
while True:
if total_data and time.time()-begin > timeout:
break
elif time.time()-begin > timeout*2:
break
try: try:
data = c_socket.recv(8192) for container in (await async_docker_client.containers.list()):
if data: if container._id == container_id:
total_data.append(data.decode('utf-8')) res = await container.stats(stream=False)
#change the beginning time for measurement
begin=time.time() if redis_client.exists(container_id + '_stats'):
stats = json.loads(redis_client.get(container_id + '_stats'))
else: else:
#sleep for sometime to indicate a gap stats = []
time.sleep(0.1) stats.append(res[0])
break if len(stats) > 3:
except: del stats[0]
pass redis_client.set(container_id + '_stats', json.dumps(stats), ex=60)
return ''.join(total_data)
try :
socket = container.exec_run([shell_cmd], stdin=True, socket=True, user=user).output._sock
if not cmd.endswith("\n"):
cmd = cmd + "\n"
socket.send(cmd.encode('utf-8'))
data = recv_socket_data(socket, timeout)
socket.close()
return data
except Exception as e: except Exception as e:
print("error - exec_cmd_container: %s" % str(e)) res = {
traceback.print_exc(file=sys.stdout) "type": "danger",
"msg": str(e)
def exec_run_handler(type, output): }
if type == 'generic': print(json.dumps(res, indent=4))
if output.exit_code == 0:
return jsonify(type='success', msg='command completed successfully')
else: else:
return jsonify(type='danger', msg='command failed: ' + output.output.decode('utf-8')) res = {
if type == 'utf8_text_only': "type": "danger",
r = Response(response=output.output.decode('utf-8'), status=200, mimetype="text/plain") "msg": "no or invalid id defined"
r.headers["Content-Type"] = "text/plain; charset=utf-8" }
return r print(json.dumps(res, indent=4))
class GracefulKiller: await asyncio.sleep(wait)
kill_now = False if stop == True:
def __init__(self): # update task was called second time, stop
signal.signal(signal.SIGINT, self.exit_gracefully) containerIds_to_update.remove(container_id)
signal.signal(signal.SIGTERM, self.exit_gracefully) else:
# call update task a second time
await get_container_stats(container_id, wait=0, stop=True)
def exit_gracefully(self, signum, frame):
self.kill_now = True
def create_self_signed_cert(): if os.environ['REDIS_SLAVEOF_IP'] != "":
process = subprocess.Popen( redis_client = redis.Redis(host=os.environ['REDIS_SLAVEOF_IP'], port=os.environ['REDIS_SLAVEOF_PORT'], db=0)
"openssl req -x509 -newkey rsa:4096 -sha256 -days 3650 -nodes -keyout /app/dockerapi_key.pem -out /app/dockerapi_cert.pem -subj /CN=dockerapi/O=mailcow -addext subjectAltName=DNS:dockerapi".split(), else:
stdout = subprocess.PIPE, stderr = subprocess.PIPE, shell=False redis_client = redis.Redis(host='redis-mailcow', port=6379, db=0)
)
process.wait()
def startFlaskAPI(): async_docker_client = aiodocker.Docker(url='unix:///var/run/docker.sock')
create_self_signed_cert()
try:
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ctx.check_hostname = False
ctx.load_cert_chain(certfile='/app/dockerapi_cert.pem', keyfile='/app/dockerapi_key.pem')
except:
print ("Cannot initialize TLS, retrying in 5s...")
time.sleep(5)
app.run(debug=False, host='0.0.0.0', port=443, threaded=True, ssl_context=ctx)
api.add_resource(containers_get, '/containers/json')
api.add_resource(container_get, '/containers/<string:container_id>/json')
api.add_resource(container_post, '/containers/<string:container_id>/<string:post_action>')
api.add_resource(host_stats_get, '/host/stats')
if __name__ == '__main__':
api_thread = Thread(target=startFlaskAPI)
api_thread.daemon = True
api_thread.start()
killer = GracefulKiller()
while True:
time.sleep(1)
if killer.kill_now:
break
print ("Stopping dockerapi-mailcow")