# uncompyle6 version 3.9.2 # Python bytecode version base 3.7.0 (3394) # Decompiled from: Python 3.8.19 (default, Mar 20 2024, 15:27:52) # [Clang 14.0.6 ] # Embedded file name: /var/user/app/device_supervisorbak/device_supervisor/lib/sftpFunc.py # Compiled at: 2024-04-18 03:12:57 # Size of source mod 2**32: 3267 bytes import ctypes from common.Logger import logger sftpFunc = ctypes.cdll.LoadLibrary("libsftpFunc.so") def split_string_into_list(s): return s.split("\n") def print_log(result): if result == 0: logger.debug("Success") else: if result == 1000: logger.error("get connect info error") else: if result == 1001: logger.error("libssh2 init failed") else: if result == 1002: logger.error("socket failed") else: if result == 1003: logger.error("connect sftp server failed") else: if result == 1004: logger.error("libssh2 session init failed") else: if result == 1005: logger.error("libssh2 sftp init failed") else: if result == 1006: logger.error("Unable to open remote file with SFTP") else: if result == 1007: logger.error("Unable to open local file") else: if result == 1008: logger.error("upload_file bytes_written != bytes_read") else: if result == 1009: logger.error("Unable to delete file with SFTP") else: logger.error("Failure establishing SSH session: %d" % result) def set_sftpConnect_info(AuthType, hostname, port=22, name=None, passwd='123456'): sftpFunc.set_connect_info.argtypes = [ ctypes.c_int, ctypes.c_char_p, ctypes.c_int, ctypes.c_char_p, ctypes.c_char_p] sftpFunc.set_connect_info.restype = ctypes.c_int result = sftpFunc.set_connect_info(AuthType, hostname.encode("utf-8"), port, name.encode("utf-8"), passwd.encode("utf-8")) print_log(result) return result def sftp_upload_file(remotename, localname): sftpFunc.upload_file.argtypes = [ ctypes.c_char_p, ctypes.c_char_p] sftpFunc.upload_file.restype = ctypes.c_int result = sftpFunc.upload_file(remotename.encode("utf-8"), localname.encode("utf-8")) print_log(result) return result def sftp_download_file(remotename, localname): sftpFunc.download_file.argtypes = [ ctypes.c_char_p, ctypes.c_char_p] sftpFunc.download_file.restype = ctypes.c_int result = sftpFunc.download_file(remotename.encode("utf-8"), localname.encode("utf-8")) print_log(result) return result def sftp_remove_file(remotename): sftpFunc.remove_file.argtypes = [ ctypes.c_char_p] sftpFunc.remove_file.restype = ctypes.c_int result = sftpFunc.remove_file(remotename.encode("utf-8")) print_log(result) return result def sftp_scan_directory(dirpath): buffer_size = 1048576 buffer = ctypes.create_string_buffer(buffer_size) err_result = ['libssh2 init failed', 'socket failed', 'connect sftp server failed', 'libssh2 session init failed', 'Failure establishing SSH session:', 'libssh2 sftp init failed', 'Unable to open dir with SFTP'] sftpFunc.scan_directory.argtypes = [ ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int] sftpFunc.scan_directory.restype = ctypes.c_char_p result = sftpFunc.scan_directory(dirpath.encode("utf-8"), buffer, buffer_size) if any((error in result.decode() for error in err_result)): logger.error(result.decode()) else: dir_list = [x.decode() for x in result.split(b'\n') if x.decode() not in ('.', '..') if len(x.decode()) > 0] logger.debug(dir_list) return dir_list