From 7cefd91820451fcf3846505e9d5753811008aa68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A9lix=20MARQUET?= Date: Tue, 25 Feb 2025 10:51:18 +0100 Subject: [PATCH] add audio LSB steganography implementation and improve client/server message handling --- TP7/7.3/client.py | 22 +++++----- TP7/7.3/server.py | 21 +++++----- TP7/7.4/audio-lsb-steganography.py | 64 ++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 19 deletions(-) create mode 100644 TP7/7.4/audio-lsb-steganography.py diff --git a/TP7/7.3/client.py b/TP7/7.3/client.py index ec5e80a..f93ae76 100644 --- a/TP7/7.3/client.py +++ b/TP7/7.3/client.py @@ -1,36 +1,38 @@ import socket import threading + def receive_messages(client_socket): try: while True: msg = client_socket.recv(1024) - if msg.decode() == 'stop': - print('Received stop signal. Closing connection.') + if msg.decode() == "stop": + print("Received stop signal. Closing connection.") break else: - print(f'Message received: {msg.decode()}') + print(f"Message received: {msg.decode()}") except Exception as e: - print(f'Error: {e}') + print(f"Error: {e}") finally: client_socket.close() + client_socket = socket.socket() -client_socket.connect(('localhost', 12345)) -client_socket.send('Coucou'.encode()) +client_socket.connect(("localhost", 12345)) +client_socket.send("Coucou".encode()) # Ask the user for messages to send try: while True: - msg = input('Message: ') + msg = input("Message: ") client_socket.send(msg.encode()) - if msg == 'stop': + if msg == "stop": break except KeyboardInterrupt: - print('Connection closed') + print("Connection closed") finally: client_socket.close() receive_thread = threading.Thread(target=receive_messages, args=(client_socket,)) receive_thread.start() -receive_thread.join() \ No newline at end of file +receive_thread.join() diff --git a/TP7/7.3/server.py b/TP7/7.3/server.py index f148e21..083b7de 100644 --- a/TP7/7.3/server.py +++ b/TP7/7.3/server.py @@ -3,50 +3,53 @@ import threading clients = [] + def send_to_all(clients, message): for client in clients: try: client.send(message) except Exception as e: - print(f'Error sending message to a client: {e}') + print(f"Error sending message to a client: {e}") clients.remove(client) + def client_thread(client_socket, clients): try: while True: msg = client_socket.recv(1024) if msg: - print(f'Message received: {msg.decode()}') + print(f"Message received: {msg.decode()}") send_to_all(clients, msg) else: break except Exception as e: - print(f'Error: {e}') + print(f"Error: {e}") finally: client_socket.close() clients.remove(client_socket) + server_socket = socket.socket() -server_socket.bind(('', 12345)) +server_socket.bind(("", 12345)) server_socket.listen() -print('Server is listening for connections...') +print("Server is listening for connections...") try: while True: client_socket, addr = server_socket.accept() - print(f'Connection from {addr}') + print(f"Connection from {addr}") clients.append(client_socket) thread = threading.Thread(target=client_thread, args=(client_socket, clients)) thread.start() except KeyboardInterrupt: - print('Server stopped') + print("Server stopped") finally: for client in clients: try: client.shutdown(socket.SHUT_RDWR) except Exception as e: - print(f'Error shutting down client socket: {e}') + print(f"Error shutting down client socket: {e}") client.close() server_socket.close() - print('Server closed') \ No newline at end of file + print("Server closed") diff --git a/TP7/7.4/audio-lsb-steganography.py b/TP7/7.4/audio-lsb-steganography.py new file mode 100644 index 0000000..1fb7880 --- /dev/null +++ b/TP7/7.4/audio-lsb-steganography.py @@ -0,0 +1,64 @@ +import struct +import wave + + +def payload(msg: bytes) -> bytes: + message_length = len(msg) + message_length_bytes = struct.pack(">I", message_length) + return message_length_bytes + msg + + +def extract_bit(byte: int, n: int) -> int: + return (byte >> n) & 1 + + +def lsb_to_byte(sequence: bytes) -> int: + byte = 0 + for i in range(8): + byte |= (sequence[i] & 1) << i + return byte + + +def encode(msg: bytes, input_file: str, output_file: str): + with wave.open(input_file, "rb") as audio: + frames = bytearray(list(audio.readframes(audio.getnframes()))) + + msg_with_length = payload(msg) + message_bin = "".join(format(byte, "08b") for byte in msg_with_length) + + if len(message_bin) > len(frames): + raise ValueError("The input file is not large enough to hold the message.") + + for i in range(len(message_bin)): + frames[i] = (frames[i] & 0xFE) | int(message_bin[i]) + + with wave.open(output_file, "wb") as audio_result: + audio_result.setparams(audio.getparams()) + audio_result.writeframes(bytes(frames)) + + +def decode(input_file: str) -> bytes: + with wave.open(input_file, "rb") as audio: + frames = bytearray(list(audio.readframes(audio.getnframes()))) + + length_bits = [frames[i] & 1 for i in range(32)] + length_bytes = bytearray() + for i in range(0, 32, 8): + length_bytes.append(lsb_to_byte(length_bits[i : i + 8])) + message_length = struct.unpack(">I", length_bytes)[0] + + message_bits = [frames[i] & 1 for i in range(32, 32 + message_length * 8)] + message_bytes = bytearray() + for i in range(0, len(message_bits), 8): + message_bytes.append(lsb_to_byte(message_bits[i : i + 8])) + + return bytes(message_bytes) + + +message = b"Hi" +input_file = "input.wav" +output_file = "output.wav" +encode(message, input_file, output_file) + +decoded_message = decode(output_file) +print(decoded_message.decode())