RK - 2021/22 - LDN10 resitev Starc Aljaz¶
Generiranje certifikatov¶
Pristop izbran v navodilih naloge naroca, da za vsakega klienta generiramo nov certifikat, katerega uporabimo tako na strezniku kot na klientu. Z majhnim stevilom klientov (kot imamo v nalogi) je to upravljivo, a vecjim stevilom (kot imamo v real-world sistemih) postane konstantno dodajanje / odstranjevanje certifikatov iz streznika zelo zamudno. Zato je standardna praksa to, da se uporablja en "root" certifikat s katerim se "podpise" druge certifikate, ki jih uporabljajo klienti. Ti "client" certifikati so potem (posredno) upostevani kot "trusted", saj so bili podpisani z "root" certifikatom, kateremu mi zaupamo.
Generiranje certifikatov¶
- Razlika med
.key
,.csr
,.pem
in.crt
datotekami - crypto.stackexchange.com - Kako generirati certifikate podpisane z root certifikatom - stackoverflow.com
openssl genrsa -out rootCA.key 2048
openssl req -x509 -new -key rootCA.key -days 3650 -out rootCA.pem
export CLIENT_NAME=client1 # Definiraj uporabnisko ime klienta
openssl genrsa -out "$CLIENT_NAME.key" 2048
openssl req -new -key "$CLIENT_NAME.key" -out "$CLIENT_NAME.csr"
openssl x509 -req -days 365 -CA rootCA.pem -CAkey rootCA.key -CAcreateserial -CAserial serial -in "$CLIENT_NAME.csr" -out "$CLIENT_NAME.pem"
Pre-generirani certifikati: - client1.key - client1.pem - client2.key - client2.pem - rootCA.key - rootCA.pem
Details¶
- Uporabnisko ime je na strezniski strani izlusceno iz vrednosti polja
CN
v certifikatu s katerim se je klient povezal. S tem lahko potrjujemo avtenticnost uporabnika, saj se lahko z danim uporabniskim imenom poveze le uporabnik, ki ima certifikat - izdan iz nase stran ter podpisan z nasim korenskim certifikatom - z danim uporabniskim imenom. Tko da ja... dokaj secure. - certifikati morajo biti v isti map kot python file-i
- server potrebuje datoteki
rootCA.key
terrootCA.pem
- client potrebuje datoteke
rootCA.pem
,<client>.key
in<client>.pem
- aplikacija deluje tudi preko interneta
- Default port je
1234
. Uporabljen port se lahko definira z environment variable-omPORT
- Default host je
127.0.0.1
. Spremenjen je lahko z spremembo env varHOST
- Default port je
Uporaba¶
- Generiraj certifikate
- Zaženi streznik z ukazom
./server.py
- Zaženi client z ukazom
./client.py
- Vpisi ime certifikata - ime datoteke datoteke, brez extension-a
- Chat and stuff
- Lakho napises message, ki bo broadcast-an vsem klientom
- Uporabi ukaz
/help
za izpis opisov ukazov
Izvorna koda¶
Client
Na klientu uporabimo kientov certifikat. Uporabljen certifikat je izbran na podlagi vpisanega uporabniskega imena - ce je vpisano ime client1
, bosta uporabljena ./client1.key
ter ./client1.pem
#!/usr/bin/python3
import socket as Socket
import os as Os
import threading as Threading
import datetime as Datetime
import json as Json
import time as Time
import sys as Sys
import re as Re
import ssl as Ssl
import OpenSSL as OpenSSL
from typing import Dict
SOCKET_PORT = int(Os.getenv('PORT', 1234))
SOCKET_HOST = Os.getenv('HOST', "127.0.0.1")
TERM_COLS, TERM_ROWS = Os.get_terminal_size()
socket: Socket.socket = None
certificate = None
def log (level: str, line: str, prefix: str = ""):
colors = {
"info": u"\u001b[34m",
"success": u"\u001b[32m",
"warn": u"\u001b[33m",
"error": u"\u001b[31m",
"log": u"\u001b[37m"
}
print(u"%s%-12s [%s%s\u001b[0m] %s" % (
prefix,
Datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
colors[level],
level.upper().center(7),
line
), flush=True)
def receiver(socket: Socket.socket):
rfile = socket.makefile()
while True:
try:
raw = rfile.readline()
if not raw:
continue
data: Dict = Json.loads(raw)
if data.get("action") == "message" or data.get("action") == "dm":
print("\033[%d;%dH[%12s%s %s" % (TERM_ROWS - 1, 0, data.get("from"), ">" if data.get("action") == "dm" else "]", data.get("data")), flush=True)
print("[%12s] " % certificate.get_subject().CN, flush=True)
print("\033[%d;%dH" % (TERM_ROWS - 1, 16), end='', flush=True)
except Exception as e:
print("Error")
print(e)
pass
def send(data: Dict):
bdata = bytes(Json.dumps(data) + "\n", "UTF-8")
socket.send(bdata)
def command(msg: str):
cmd, *args = msg[1:].split(" ")
if cmd == "help":
print("""
\u001b[33mCommands\u001b[0m:
\u001b[34m/help\u001b[0m
Display this help menu
\u001b[34m/msg\u001b[0m <\u001b[34msession\u001b[0m> <\u001b[34mcontent\u001b[0m>
Send a direct message to a specific session
<\u001b[34msession\u001b[0m> is the session identificator. You can get list of them with /list
<\u001b[34mcontent\u001b[0m> is your message content
""")
elif cmd == "msg":
send({ "action": "dm", "to": args[0], "data": " ".join(args[1:]) })
else:
print("\u001b[31mERROR!\u001b[0m Invalid command. Try \u001b[34m/help\u001b[0m for a list of commands")
socket_ssl = None
try:
while not certificate or not socket_ssl:
print(chr(27) + "[2J")
print("\033[%d;%dH" % (TERM_ROWS / 2, TERM_COLS / 2 - 30), flush=True, end="")
vpis = input("Certificate name (client1): ")
if Os.path.exists(vpis + ".key") and Os.path.exists(vpis + ".pem"):
socket_ssl = Ssl.SSLContext(Ssl.PROTOCOL_TLSv1_2)
socket_ssl.verify_mode = Ssl.CERT_REQUIRED
socket_ssl.load_cert_chain(certfile = vpis + ".pem", keyfile = vpis + ".key")
socket_ssl.load_verify_locations('./rootCA.pem')
socket_ssl.set_ciphers('ECDHE-RSA-AES128-GCM-SHA256')
with open(vpis + ".pem", "r") as f:
certificate = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, f.read())
print(chr(27) + "[2J")
except KeyboardInterrupt:
print()
Sys.exit()
while True:
try:
print("\033[%d;%dH" % (TERM_ROWS - 1, 0), flush=True)
print("[ \u001b[34msystem\u001b[0m] Use \u001b[34m/help\u001b[0m for a list of commands")
print("[ \u001b[34msystem\u001b[0m] connecting to chat server ... ", end="", flush=True)
socket = socket_ssl.wrap_socket(Socket.socket(Socket.AF_INET, Socket.SOCK_STREAM))
connected = False
while not connected:
try:
socket.connect((SOCKET_HOST, SOCKET_PORT))
connected = True
except Exception as e:
print(e)
pass
Time.sleep(1.0)
print("\u001b[32mCONNECTED\u001b[0m!")
thread = Threading.Thread(target=receiver, args=(socket,))
thread.daemon = True
thread.start()
while True:
try:
print("[%12s] " % certificate.get_subject().CN)
print("\033[%d;%dH" % (TERM_ROWS - 1, 16), end='', flush=True)
vpis = input("")
if not vpis:
print ("\033[A" + " " * TERM_COLS + "\033[A", flush=True)
continue
if vpis.startswith("/"):
command(vpis)
else:
send({ "action": "message", "data": vpis })
except Exception as e:
log("error", e)
print("Something went wrong! Restarting...")
break
except KeyboardInterrupt:
print()
Sys.exit()
except Exception as e:
print(e)
Sys.exit()
Server
Na strezniku uporabimo samo nas root certifikat
#!/usr/bin/python3
import socket as Socket
import os as Os
import threading as Threading
import datetime as Datetime
import json as Json
import time as Time
import ssl as Ssl
import OpenSSL as OpenSSL
from typing import Dict, Set, TextIO
SOCKET_PORT = int(Os.getenv('PORT', 1234))
SOCKET_BIND = Os.getenv('BIND', "127.0.0.1")
def log (level: str, line: str, prefix: str = ""):
colors = {
"info": u"\u001b[34m",
"success": u"\u001b[32m",
"warn": u"\u001b[33m",
"error": u"\u001b[31m",
"log": u"\u001b[37m"
}
print(u"%s%-12s [%s%s\u001b[0m] %s" % (
prefix,
Datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
colors[level],
level.upper().center(7),
line
), flush=True)
class Client():
socket: Socket.socket = None
rfile: TextIO = None
address = None
thread: Threading.Thread = None
cert = None
def __init__(self, socket: Socket.socket, address) -> None:
self.socket = socket
self.address = address
self.rfile = socket.makefile()
self.cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, socket.getpeercert(True))
self.thread = Threading.Thread(target=self.entrypoint)
self.thread.daemon = True
self.thread.start()
def entrypoint (self):
"""
The client thread entrypoint
"""
log("log", "[client ] %s connected " % self.cert.get_subject().CN)
emptyCount = 0
while True:
try:
content = self.rfile.readline()
if not content:
emptyCount += 1
if emptyCount > 10: break
continue
emptyCount = 0
data: Dict = Json.loads(content)
if (data.get('action') == 'message'):
log("log", "[public ] %s: %s " % (self.cert.get_subject().CN, data.get('data')))
for client in clients:
if client.cert.get_subject().CN != self.cert.get_subject().CN:
client.send({ "action": "message", "from": self.cert.get_subject().CN, "data": data.get('data') })
if (data.get('action') == 'dm'):
log("log", "[private] %s -> %s: %s " % (self.cert.get_subject().CN, data.get('to'), data.get('data')))
for client in clients:
if client.cert.get_subject().CN == data.get('to'):
client.send({ "action": "dm", "from": self.cert.get_subject().CN, "data": data.get('data') })
except:
pass
self.close()
def send(self, data: Dict):
"""
Send data to client
"""
bdata = bytes(Json.dumps(data) + "\n", "UTF-8")
self.socket.send(bdata)
def close(self):
"""
Gracefully close the client socket
"""
with server_lock:
log("log", "[client] %s closing ..." % self.cert.get_subject().CN)
self.socket.close()
clients.remove(self)
log("warn", "[client] %s closed" % self.cert.get_subject().CN)
def setup_SSL_context():
context = Ssl.SSLContext(Ssl.PROTOCOL_TLSv1_2)
context.verify_mode = Ssl.CERT_REQUIRED
context.load_cert_chain(certfile="./rootCA.pem", keyfile="./rootCA.key")
context.load_verify_locations('./rootCA.pem')
context.set_ciphers('ECDHE-RSA-AES128-GCM-SHA256')
return context
log("info", "[system ] Starting server on port %d ..." % SOCKET_PORT)
clients: Set[Client] = set()
server_lock = Threading.Lock()
server_ssl = setup_SSL_context()
server_socket = server_ssl.wrap_socket(Socket.socket(Socket.AF_INET, Socket.SOCK_STREAM))
server_socket.bind((SOCKET_BIND, SOCKET_PORT))
server_socket.listen(10)
log("success", "[system ] Started server on port %d ..." % SOCKET_PORT)
log("info", "[system ] Starting healthchecker")
def healthchecker():
while True:
for client in list(clients):
try: client.send({"action": "healthcheck"})
except: pass
Time.sleep(5)
healthchecker_thread = Threading.Thread(target=healthchecker)
healthchecker_thread.daemon = True
healthchecker_thread.start()
log("success", "[system ] Started healthchecker")
while True:
try:
socket, address = server_socket.accept()
with server_lock:
client = Client(socket, address)
clients.add(client)
except KeyboardInterrupt:
break
except:
pass
log("warn", "[system ] Closing server ...", "\n")
for client in list(clients):
client.close()
server_socket.close()
log("info", "[system ] Server closed")