Coverage for kaggle_watchdog.py: 81%
86 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 23:06 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 23:06 +0000
1# Copyright 2026 venim1103
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import os
16import subprocess
18REPO_DIR = "/kaggle/working/mini-mamba-agent-1.58b"
19REQUIRED_SECRET_KEYS = ("HF_TOKEN", "WANDB_API_KEY", "REPO_ID")
20REPO_ENV_VAR = "WATCHDOG_REPO_DIR"
23def _candidate_repo_dirs(explicit_repo_dir: str | None = None) -> list[str]:
24 candidates = []
25 if explicit_repo_dir:
26 candidates.append(explicit_repo_dir)
27 env_repo_dir = os.environ.get(REPO_ENV_VAR)
28 if env_repo_dir:
29 candidates.append(env_repo_dir)
30 candidates.append(REPO_DIR)
31 candidates.append(os.getcwd())
32 candidates.append(os.path.dirname(os.path.abspath(__file__)))
33 # De-duplicate while preserving order.
34 return list(dict.fromkeys(candidates))
37def set_repo_directory(repo_dir: str | None = None, *, logger=print) -> bool:
38 """Switch to a usable repository directory across Kaggle, Colab, or local setups."""
39 for candidate in _candidate_repo_dirs(repo_dir):
40 if os.path.exists(candidate):
41 os.chdir(candidate)
42 logger(f"Directory set to: {os.getcwd()}")
43 return True
44 logger(
45 "Repository not found! Set WATCHDOG_REPO_DIR or run this script from the cloned repo root."
46 )
47 return False
50def _load_kaggle_secrets() -> dict[str, str]:
51 from kaggle_secrets import UserSecretsClient
53 user_secrets = UserSecretsClient()
54 return {key: user_secrets.get_secret(key) for key in REQUIRED_SECRET_KEYS}
57def _load_colab_secrets() -> dict[str, str]:
58 from google.colab import userdata
60 return {key: userdata.get(key) for key in REQUIRED_SECRET_KEYS}
63def load_runtime_secrets(*, logger=print) -> dict[str, str]:
64 """Resolve required secrets from env, then optional notebook providers."""
65 resolved = {
66 key: os.environ.get(key) for key in REQUIRED_SECRET_KEYS if os.environ.get(key)
67 }
68 missing = [key for key in REQUIRED_SECRET_KEYS if key not in resolved]
69 if not missing:
70 logger("Using secrets already present in environment variables")
71 return resolved
73 providers = (
74 ("Kaggle secrets", _load_kaggle_secrets),
75 ("Colab userdata", _load_colab_secrets),
76 )
77 for provider_name, provider_fn in providers:
78 try:
79 provider_values = provider_fn()
80 except Exception:
81 continue
82 for key in missing:
83 value = provider_values.get(key)
84 if value:
85 resolved[key] = value
86 os.environ[key] = value
87 missing = [key for key in REQUIRED_SECRET_KEYS if key not in resolved]
88 if not missing:
89 logger(f"Loaded {', '.join(REQUIRED_SECRET_KEYS)} from {provider_name}")
90 return resolved
92 raise RuntimeError(
93 f"Missing required secrets: {', '.join(missing)}. "
94 "Provide them via environment variables or notebook secret manager."
95 )
98def load_kaggle_secrets(*, logger=print) -> dict[str, str]:
99 """Backward-compatible wrapper around cross-environment secret loading."""
100 return load_runtime_secrets(logger=logger)
103def start_background_sync(*, log_path: str = "sync_logs.txt"):
104 """Start background sync script and redirect output to a log file."""
105 logger_msg = "Starting background sync watchdog..."
106 print(logger_msg)
107 child_env = os.environ.copy()
108 # Force immediate line flushing so Kaggle log files are populated in real time.
109 child_env["PYTHONUNBUFFERED"] = "1"
110 with open(log_path, "a", buffering=1) as log_file:
111 log_file.write("[watchdog] launching background_sync.py\n")
112 log_file.flush()
113 return subprocess.Popen(
114 ["python", "-u", "background_sync.py"],
115 stdout=log_file,
116 stderr=subprocess.STDOUT,
117 env=child_env,
118 start_new_session=True,
119 )
122def run_sync_self_check(*, log_path: str = "sync_logs.txt") -> int:
123 """Run one-shot uploader diagnostics before launching the long-running sync loop."""
124 print("Running background sync self-check...")
125 child_env = os.environ.copy()
126 child_env["PYTHONUNBUFFERED"] = "1"
127 with open(log_path, "a", buffering=1) as log_file:
128 log_file.write("[watchdog] running background_sync.py --self-check\n")
129 log_file.flush()
130 result = subprocess.run(
131 ["python", "-u", "background_sync.py", "--self-check"],
132 stdout=log_file,
133 stderr=subprocess.STDOUT,
134 env=child_env,
135 check=False,
136 )
137 print(f"Background sync self-check exit code: {result.returncode}")
138 return result.returncode
141def main():
142 if not set_repo_directory():
143 return 1
144 load_runtime_secrets()
145 check_code = run_sync_self_check(log_path="sync_logs.txt")
146 if check_code != 0:
147 print("Background sync self-check failed. See sync_logs.txt for details.")
148 return 2
149 proc = start_background_sync(log_path="sync_logs.txt")
150 pid = getattr(proc, "pid", "unknown")
151 print(f"Background sync started with PID {pid}. Tail sync_logs.txt to monitor.")
152 return 0
155if __name__ == "__main__":
156 raise SystemExit(main())