mirror of
https://github.com/BreizhHardware/py_CIPA3.git
synced 2026-01-18 16:37:30 +01:00
add audio LSB steganography implementation and improve client/server message handling
This commit is contained in:
@@ -1,36 +1,38 @@
|
||||
import socket
|
||||
import threading
|
||||
|
||||
|
||||
def receive_messages(client_socket):
|
||||
try:
|
||||
while True:
|
||||
msg = client_socket.recv(1024)
|
||||
if msg.decode() == 'stop':
|
||||
print('Received stop signal. Closing connection.')
|
||||
if msg.decode() == "stop":
|
||||
print("Received stop signal. Closing connection.")
|
||||
break
|
||||
else:
|
||||
print(f'Message received: {msg.decode()}')
|
||||
print(f"Message received: {msg.decode()}")
|
||||
except Exception as e:
|
||||
print(f'Error: {e}')
|
||||
print(f"Error: {e}")
|
||||
finally:
|
||||
client_socket.close()
|
||||
|
||||
|
||||
client_socket = socket.socket()
|
||||
client_socket.connect(('localhost', 12345))
|
||||
client_socket.send('Coucou'.encode())
|
||||
client_socket.connect(("localhost", 12345))
|
||||
client_socket.send("Coucou".encode())
|
||||
|
||||
# Ask the user for messages to send
|
||||
try:
|
||||
while True:
|
||||
msg = input('Message: ')
|
||||
msg = input("Message: ")
|
||||
client_socket.send(msg.encode())
|
||||
if msg == 'stop':
|
||||
if msg == "stop":
|
||||
break
|
||||
except KeyboardInterrupt:
|
||||
print('Connection closed')
|
||||
print("Connection closed")
|
||||
finally:
|
||||
client_socket.close()
|
||||
|
||||
receive_thread = threading.Thread(target=receive_messages, args=(client_socket,))
|
||||
receive_thread.start()
|
||||
receive_thread.join()
|
||||
receive_thread.join()
|
||||
|
||||
@@ -3,50 +3,53 @@ import threading
|
||||
|
||||
clients = []
|
||||
|
||||
|
||||
def send_to_all(clients, message):
|
||||
for client in clients:
|
||||
try:
|
||||
client.send(message)
|
||||
except Exception as e:
|
||||
print(f'Error sending message to a client: {e}')
|
||||
print(f"Error sending message to a client: {e}")
|
||||
clients.remove(client)
|
||||
|
||||
|
||||
def client_thread(client_socket, clients):
|
||||
try:
|
||||
while True:
|
||||
msg = client_socket.recv(1024)
|
||||
if msg:
|
||||
print(f'Message received: {msg.decode()}')
|
||||
print(f"Message received: {msg.decode()}")
|
||||
send_to_all(clients, msg)
|
||||
else:
|
||||
break
|
||||
except Exception as e:
|
||||
print(f'Error: {e}')
|
||||
print(f"Error: {e}")
|
||||
finally:
|
||||
client_socket.close()
|
||||
clients.remove(client_socket)
|
||||
|
||||
|
||||
server_socket = socket.socket()
|
||||
server_socket.bind(('', 12345))
|
||||
server_socket.bind(("", 12345))
|
||||
server_socket.listen()
|
||||
|
||||
print('Server is listening for connections...')
|
||||
print("Server is listening for connections...")
|
||||
|
||||
try:
|
||||
while True:
|
||||
client_socket, addr = server_socket.accept()
|
||||
print(f'Connection from {addr}')
|
||||
print(f"Connection from {addr}")
|
||||
clients.append(client_socket)
|
||||
thread = threading.Thread(target=client_thread, args=(client_socket, clients))
|
||||
thread.start()
|
||||
except KeyboardInterrupt:
|
||||
print('Server stopped')
|
||||
print("Server stopped")
|
||||
finally:
|
||||
for client in clients:
|
||||
try:
|
||||
client.shutdown(socket.SHUT_RDWR)
|
||||
except Exception as e:
|
||||
print(f'Error shutting down client socket: {e}')
|
||||
print(f"Error shutting down client socket: {e}")
|
||||
client.close()
|
||||
server_socket.close()
|
||||
print('Server closed')
|
||||
print("Server closed")
|
||||
|
||||
64
TP7/7.4/audio-lsb-steganography.py
Normal file
64
TP7/7.4/audio-lsb-steganography.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import struct
|
||||
import wave
|
||||
|
||||
|
||||
def payload(msg: bytes) -> bytes:
|
||||
message_length = len(msg)
|
||||
message_length_bytes = struct.pack(">I", message_length)
|
||||
return message_length_bytes + msg
|
||||
|
||||
|
||||
def extract_bit(byte: int, n: int) -> int:
|
||||
return (byte >> n) & 1
|
||||
|
||||
|
||||
def lsb_to_byte(sequence: bytes) -> int:
|
||||
byte = 0
|
||||
for i in range(8):
|
||||
byte |= (sequence[i] & 1) << i
|
||||
return byte
|
||||
|
||||
|
||||
def encode(msg: bytes, input_file: str, output_file: str):
|
||||
with wave.open(input_file, "rb") as audio:
|
||||
frames = bytearray(list(audio.readframes(audio.getnframes())))
|
||||
|
||||
msg_with_length = payload(msg)
|
||||
message_bin = "".join(format(byte, "08b") for byte in msg_with_length)
|
||||
|
||||
if len(message_bin) > len(frames):
|
||||
raise ValueError("The input file is not large enough to hold the message.")
|
||||
|
||||
for i in range(len(message_bin)):
|
||||
frames[i] = (frames[i] & 0xFE) | int(message_bin[i])
|
||||
|
||||
with wave.open(output_file, "wb") as audio_result:
|
||||
audio_result.setparams(audio.getparams())
|
||||
audio_result.writeframes(bytes(frames))
|
||||
|
||||
|
||||
def decode(input_file: str) -> bytes:
|
||||
with wave.open(input_file, "rb") as audio:
|
||||
frames = bytearray(list(audio.readframes(audio.getnframes())))
|
||||
|
||||
length_bits = [frames[i] & 1 for i in range(32)]
|
||||
length_bytes = bytearray()
|
||||
for i in range(0, 32, 8):
|
||||
length_bytes.append(lsb_to_byte(length_bits[i : i + 8]))
|
||||
message_length = struct.unpack(">I", length_bytes)[0]
|
||||
|
||||
message_bits = [frames[i] & 1 for i in range(32, 32 + message_length * 8)]
|
||||
message_bytes = bytearray()
|
||||
for i in range(0, len(message_bits), 8):
|
||||
message_bytes.append(lsb_to_byte(message_bits[i : i + 8]))
|
||||
|
||||
return bytes(message_bytes)
|
||||
|
||||
|
||||
message = b"Hi"
|
||||
input_file = "input.wav"
|
||||
output_file = "output.wav"
|
||||
encode(message, input_file, output_file)
|
||||
|
||||
decoded_message = decode(output_file)
|
||||
print(decoded_message.decode())
|
||||
Reference in New Issue
Block a user