Praca na socketach i wątkach - Bad File Descriptor

0

Witajcie

Mam dość trudny do rozwiązania problem (przynajmniej dla mnie). Otóż piszę aktualnie projekt serwera FTP i klienta w oparciu o niskopoziomowe sockety oraz protokół TCP. Serwer oparty jest o wątki z racji tego, że musi obsługiwać x klientów, a nie tylko jednego. Tutaj pojawia się problem.

Z dokumentu RFC serwera FTP (https://tools.ietf.org/html/rfc959) wynika, że komenda quit powoduje zamknięcie socketu kontrolnego (tego, który odpowiada za przesył odpowiedzi na komendy ze strony klienta, socket ten nie obsługuje przesyłania danych takich jak np. pliki). Po zamknięciu socketu kontrolnego serwer oczywiście musi działać dalej, ale niestety następuje błąd Bad file descriptor przy instrukcji self.sock.listen(1) co oznacza, że serwer próbuje nasłuchiwać na już zamkniętym sockecie. Logicznie rzecz biorąc wystarczyłoby napisać coś takiego:

import socket

try:
    self.control_sock.listen(1)
except OSError:
    self.control_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.HOST = socket.gethostbyname(socket.gethostname())
    self.PORT = 5000
    self.control_sock.bind((self.HOST, self.PORT))
    self.control_sock.listen(1)

Ale ten kod również nie rozwiązuje problemu, ponieważ wyrzuca exception Adresses already in use właściwie nie wiem z jakiego powodu, ponieważ po komendzie quit sockety po stronie serwera i po stronie klienta są zamykane toteż host i port powinny się "zwolnić".

Oto kody:

SERWER

#!/usr/bin/python3

from threading import Thread
import getpass
import socket
import platform
import os
from os import path
import sys

class waitForClients(Thread):
	def __init__(self):
		Thread.__init__(self)
		self.control_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		self.HOST = socket.gethostbyname(socket.gethostname())
		self.PORT = 5000
		self.control_sock.bind((self.HOST, self.PORT))

	def run(self):
		while True:
			try:
				self.control_sock.listen(1)
			except OSError:
				self.__init__()
				self.control_sock.listen(1)
			talk_with_client = receiveCommandsFromClient(self.control_sock, self.control_sock.accept())
			talk_with_client.start()

class createDataConnection():
	def __init__(self):
		self.data_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		self.data_HOST = '127.0.1.1'
		self.data_PORT = 5010
		self.data_sock.connect((self.data_HOST, self.data_PORT))
		print("Polaczony!")

class receiveCommandsFromClient(Thread):
	def __init__(self, sock, connection_tuple):
		Thread.__init__(self)
		self.control_sock = sock
		self.conn, self.addr = connection_tuple
		self.logged_in = False
		self.mode = 'ascii'
		self.username = ''
		self.home_directory = os.path.abspath('/')				# katalogiem domowym jest /
		self.current_working_directory = os.path.abspath('/')	# poczatkowo katalogiem roboczym jest /
		self.passive_mode = False
		self.commands =	{
							"type": self.TYPE,
				 	  		"user": self.USER,
							"cd":	self.CHANGE_DIR,
							"quit": self.QUIT
						}

	def run(self):
		self.welcome_msg()
		self.data_connection = createDataConnection()
		self.login()
		self.logged_in = self.check_identification() 
		while True:
			self.data = self.receive_data()
			self.command = self.data.split(" ")[0]
			if not self.data:
				break
			if self.logged_in:
				try:
					self.commands.get(self.command)()
				except TypeError:
					self.conn.send("Invalid command.".encode())
			else:
				if self.command == 'user':
					self.commands.get(self.command)()
				else:
					self.conn.send("530 Please Login with USER and PASS.\nLogin failed.".encode())
			
	def welcome_msg(self):
		self.conn.send("Connected to the server.\nServer is ready.\n".encode())

	def login(self):
		self.username = self.conn.recv(1024).decode()
		self.conn.send("331 Password required for USER.\n".encode())

	def required_msg(self):
		self.conn.send("331 Password required for USER.\n".encode())

	def correct_ident_msg(self):
		self.conn.send(bytes('230- ----------------------------------------------------------\n' 
			      '230- FTP server by Kamil Szpakowski\n' 				 
			      '230- ----------------------------------------------------------\n' 
			      'Remote system type is ' + platform.system() + '.', "utf-8"))

	def incorrect_ident_msg(self):
		self.conn.send("530 Please Login with USER and PASS.\nLogin failed.".encode())

	
	def check_identification(self):
		if self.username == 'anonymous':
			self.correct_ident_msg()
			return True
		else:
			self.incorrect_ident_msg()
			return False


	def receive_data(self):
		try:
			data = self.conn.recv(1024).decode()
			return data
		except ConnectionResetError:
			self.data_connection.data_sock.close()

	def take_parameter(self):
		try:
			self.param = self.data.split(" ")[1]
		except IndexError:
			self.param = ''

	def TYPE(self):
		self.take_parameter()
		if self.param == 'ascii':
			self.mode = self.param
			self.conn.send("200 TYPE set to A.".encode())
		elif self.param == 'binary':
			self.mode = self.param
			self.conn.send("200 TYPE set to I.".encode())
		elif self.param == '' and self.mode == 'ascii':
			self.conn.send(bytes('Using ' + self.mode + ' mode to transfer files.', 'utf-8'))
		elif self.param == '' and self.mode == 'binary':
			self.conn.send(bytes('Using ' + self.mode + ' mode to transfer files.', 'utf-8'))

	def USER(self):
		self.take_parameter()
		if self.logged_in:
			self.conn.send("503 You are already logged in!\nLogin failed.".encode())
		else:
			if not self.param:
				self.conn.send('usage: user <login>'.encode())
			else:
				self.username = self.param
				self.required_msg()
				self.logged_in = self.check_identification()

	def CHANGE_DIR(self):
		self.take_parameter()
		new_path = self.param
		first_char_of_new_path = new_path[0]
		if first_char_of_new_path == '/' and os.path.isdir(new_path):
			os.chdir(new_path)
			self.current_working_directory = os.path.join(new_path)
			self.conn.send("250 CWD command successful.".encode())
		elif first_char_of_new_path != '/' and os.path.isdir(new_path):
			os.chdir(os.path.join(self.current_working_directory, new_path))
			self.current_working_directory = os.path.join(self.current_working_directory, new_path)
			self.conn.send("250 CWD command successful.".encode())
		elif not os.path.isdir(new_path):
			self.conn.send(bytes("550 " + new_path + ": No such file or directory.", "utf-8"))

	def QUIT(self):
		self.control_sock.close()
		self.data_connection.data_sock.close()
	
if __name__ == "__main__":
	th = waitForClients()
	th.start()

KLIENT

#!/usr/bin/python3
import socket
import getpass
import threading
import sys

class talkWithTheServer(threading.Thread):
	def __init__(self, control_sock, data_sock):
		threading.Thread.__init__(self)
		self.control_sock = control_sock
		self.data_sock = data_sock
		self.login = ''
		self.client_side_effects = {	
										"quit": self.QUIT,
										"user": self.rest_of_authorization_processes
								   };

	def receive_first_msg(self):
		welcome_msg = self.control_sock.recv(1024)
		print(welcome_msg.decode())
	
	def login_auth_to_server(self):
		self.login = input("Name: ")
		self.control_sock.send(self.login.encode())

	def password_auth_to_server(self):
		getpass.getpass()
	
	def receive_required_auth_data(self):
		user_auth_data = self.control_sock.recv(30)
		print(user_auth_data.decode())

	def receive_incorrect_auth_data(self):
		incorrect_auth_data = self.control_sock.recv(55).decode()
		incorrect_auth_data = incorrect_auth_data.split("\n")[1:]
		print('\n'.join(incorrect_auth_data))

	def receive_correct_auth_data(self):
		correct_auth_data = self.control_sock.recv(1024)
		print(correct_auth_data.decode())

	def send_data(self):
		self.data_to_send = input('ftp> ')
		try:
			self.command = self.data_to_send.split(" ")[0]
		except IndexError:
			self.command = ''
		self.control_sock.send(self.data_to_send.encode())
		
	def receive_and_send_data(self):
		while True:
			data = self.control_sock.recv(1024)
			print(data.decode())
			self.send_data()
			self.check_for_client_side_command()
			if not data:
				break

	def receive_data(self):
		self.control_sock.send(self.data_to_send.encode())
		data = self.control_sock.recv(1024)
		print(data.decode())

	def check_for_client_side_command(self):
		try:
			self.client_side_effects.get(self.command)()
		except TypeError:
			pass

	def first_authorization_process(self):
		self.login_auth_to_server()
		self.receive_required_auth_data()	
		self.password_auth_to_server()									
		if self.login == 'anonymous':									
			self.receive_correct_auth_data()
			self.send_data()
			self.check_for_client_side_command()
		else:
			self.receive_incorrect_auth_data()
			self.send_data()

	def rest_of_authorization_processes(self):
		try:
			self.param = self.data_to_send.split(" ")[1]
		except IndexError:
			self.param = ''
		if self.login == 'anonymous':
			self.receive_correct_auth_data()
			self.send_data()
		elif self.param == 'anonymous':
			self.receive_required_auth_data()
			self.password_auth_to_server()
			self.receive_correct_auth_data()
			self.login = self.param
			self.send_data()
		else:
			self.receive_required_auth_data()
			self.password_auth_to_server()
			self.receive_incorrect_auth_data()
			self.send_data()

	def QUIT(self):
		self.control_sock.close()
		self.data_sock.close()
		sys.exit()


	def run(self):
		self.receive_first_msg()
		self.first_authorization_process()
		self.receive_and_send_data()
	
class createControlConnection():
	def __init__(self):
		self.control_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		self.control_HOST = '127.0.1.1'
		self.control_PORT = 5000
		self.control_sock.connect((self.control_HOST, self.control_PORT))

class createDataConnection(createControlConnection, threading.Thread):
	def __init__(self, control_sock):
		threading.Thread.__init__(self)
		self.control_sock = control_sock
		self.data_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		self.data_HOST = ''
		self.data_PORT = 5010
		self.data_sock.bind((self.data_HOST, self.data_PORT))
		self.data_sock.listen(1)
		self.data_conn, self.data_addr = self.data_sock.accept()
			
		
if __name__ == "__main__":
	new_control_connection = createControlConnection()
	data_connection = createDataConnection(new_control_connection.control_sock)
	talk_with_the_server = talkWithTheServer(new_control_connection.control_sock, data_connection.data_sock)
	talk_with_the_server.start()

Staram się jak najmniej prosić o pomoc, ale ten problem mnie przerósł. Byłbym wdzięczny za jakąkolwiek pomoc.

0

Problem został przeze mnie rozwiązany. Jeśli ktoś miałby podobny to oto rozwiązanie. Nie wiem na ile ono jest optymalne, aczkolwiek działa prawidłowo i dostatecznie szybko. Zastanawiałem się jak można to zrobić inaczej, ale ostatecznie wybrałem najprostsze rozwiązanie tego problemu.

class waitForClients(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.control_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.HOST = socket.gethostbyname(socket.gethostname())
        self.PORT = 5000
        self.control_sock.bind((self.HOST, self.PORT))
 
    def run(self):
        while True:
            try:
                self.control_sock.listen(1)
            except OSError:
                self.__init__()
                self.control_sock.listen(1)
            talk_with_client = receiveCommandsFromClient(self.control_sock, self.control_sock.accept())
            talk_with_client.start()

Metodę run zmodyfikowałem do takiej postaci:


	def run(self):
		while True:
			try:
				self.control_sock.listen(1)
			except OSError:
				while True:
					try:
						self.__init__()
						break
					except:
						continue
			    self.control_sock.listen(1)
			talk_with_client = receiveCommandsFromClient(self.control_sock, self.control_sock.accept())
			talk_with_client.start()

Gdy program wyrzuci wyjątek Bad file descriptor to serwer stara się stworzyć nowy socket. Jeśli nie uda mu się tego zrobić z powodu wyjątku Adresses already in use to tworzy sockety dopóki wyjątek przestanie istnieć. Gdy uda mu się sockety stworzyć ponownie na nich nasłuchuje.

1 użytkowników online, w tym zalogowanych: 0, gości: 1