Coverage for kaggle_watchdog.py: 81%

86 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-21 23:06 +0000

1# Copyright 2026 venim1103 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import os 

16import subprocess 

17 

18REPO_DIR = "/kaggle/working/mini-mamba-agent-1.58b" 

19REQUIRED_SECRET_KEYS = ("HF_TOKEN", "WANDB_API_KEY", "REPO_ID") 

20REPO_ENV_VAR = "WATCHDOG_REPO_DIR" 

21 

22 

23def _candidate_repo_dirs(explicit_repo_dir: str | None = None) -> list[str]: 

24 candidates = [] 

25 if explicit_repo_dir: 

26 candidates.append(explicit_repo_dir) 

27 env_repo_dir = os.environ.get(REPO_ENV_VAR) 

28 if env_repo_dir: 

29 candidates.append(env_repo_dir) 

30 candidates.append(REPO_DIR) 

31 candidates.append(os.getcwd()) 

32 candidates.append(os.path.dirname(os.path.abspath(__file__))) 

33 # De-duplicate while preserving order. 

34 return list(dict.fromkeys(candidates)) 

35 

36 

37def set_repo_directory(repo_dir: str | None = None, *, logger=print) -> bool: 

38 """Switch to a usable repository directory across Kaggle, Colab, or local setups.""" 

39 for candidate in _candidate_repo_dirs(repo_dir): 

40 if os.path.exists(candidate): 

41 os.chdir(candidate) 

42 logger(f"Directory set to: {os.getcwd()}") 

43 return True 

44 logger( 

45 "Repository not found! Set WATCHDOG_REPO_DIR or run this script from the cloned repo root." 

46 ) 

47 return False 

48 

49 

50def _load_kaggle_secrets() -> dict[str, str]: 

51 from kaggle_secrets import UserSecretsClient 

52 

53 user_secrets = UserSecretsClient() 

54 return {key: user_secrets.get_secret(key) for key in REQUIRED_SECRET_KEYS} 

55 

56 

57def _load_colab_secrets() -> dict[str, str]: 

58 from google.colab import userdata 

59 

60 return {key: userdata.get(key) for key in REQUIRED_SECRET_KEYS} 

61 

62 

63def load_runtime_secrets(*, logger=print) -> dict[str, str]: 

64 """Resolve required secrets from env, then optional notebook providers.""" 

65 resolved = { 

66 key: os.environ.get(key) for key in REQUIRED_SECRET_KEYS if os.environ.get(key) 

67 } 

68 missing = [key for key in REQUIRED_SECRET_KEYS if key not in resolved] 

69 if not missing: 

70 logger("Using secrets already present in environment variables") 

71 return resolved 

72 

73 providers = ( 

74 ("Kaggle secrets", _load_kaggle_secrets), 

75 ("Colab userdata", _load_colab_secrets), 

76 ) 

77 for provider_name, provider_fn in providers: 

78 try: 

79 provider_values = provider_fn() 

80 except Exception: 

81 continue 

82 for key in missing: 

83 value = provider_values.get(key) 

84 if value: 

85 resolved[key] = value 

86 os.environ[key] = value 

87 missing = [key for key in REQUIRED_SECRET_KEYS if key not in resolved] 

88 if not missing: 

89 logger(f"Loaded {', '.join(REQUIRED_SECRET_KEYS)} from {provider_name}") 

90 return resolved 

91 

92 raise RuntimeError( 

93 f"Missing required secrets: {', '.join(missing)}. " 

94 "Provide them via environment variables or notebook secret manager." 

95 ) 

96 

97 

98def load_kaggle_secrets(*, logger=print) -> dict[str, str]: 

99 """Backward-compatible wrapper around cross-environment secret loading.""" 

100 return load_runtime_secrets(logger=logger) 

101 

102 

103def start_background_sync(*, log_path: str = "sync_logs.txt"): 

104 """Start background sync script and redirect output to a log file.""" 

105 logger_msg = "Starting background sync watchdog..." 

106 print(logger_msg) 

107 child_env = os.environ.copy() 

108 # Force immediate line flushing so Kaggle log files are populated in real time. 

109 child_env["PYTHONUNBUFFERED"] = "1" 

110 with open(log_path, "a", buffering=1) as log_file: 

111 log_file.write("[watchdog] launching background_sync.py\n") 

112 log_file.flush() 

113 return subprocess.Popen( 

114 ["python", "-u", "background_sync.py"], 

115 stdout=log_file, 

116 stderr=subprocess.STDOUT, 

117 env=child_env, 

118 start_new_session=True, 

119 ) 

120 

121 

122def run_sync_self_check(*, log_path: str = "sync_logs.txt") -> int: 

123 """Run one-shot uploader diagnostics before launching the long-running sync loop.""" 

124 print("Running background sync self-check...") 

125 child_env = os.environ.copy() 

126 child_env["PYTHONUNBUFFERED"] = "1" 

127 with open(log_path, "a", buffering=1) as log_file: 

128 log_file.write("[watchdog] running background_sync.py --self-check\n") 

129 log_file.flush() 

130 result = subprocess.run( 

131 ["python", "-u", "background_sync.py", "--self-check"], 

132 stdout=log_file, 

133 stderr=subprocess.STDOUT, 

134 env=child_env, 

135 check=False, 

136 ) 

137 print(f"Background sync self-check exit code: {result.returncode}") 

138 return result.returncode 

139 

140 

141def main(): 

142 if not set_repo_directory(): 

143 return 1 

144 load_runtime_secrets() 

145 check_code = run_sync_self_check(log_path="sync_logs.txt") 

146 if check_code != 0: 

147 print("Background sync self-check failed. See sync_logs.txt for details.") 

148 return 2 

149 proc = start_background_sync(log_path="sync_logs.txt") 

150 pid = getattr(proc, "pid", "unknown") 

151 print(f"Background sync started with PID {pid}. Tail sync_logs.txt to monitor.") 

152 return 0 

153 

154 

155if __name__ == "__main__": 

156 raise SystemExit(main())